【问题标题】:netty4: MessageToMessageDecoder<DatagramPacket> gives only first 2048 octetsnetty4: MessageToMessageDecoder<DatagramPacket> 仅给出前 2048 个八位字节
【发布时间】:2015-10-12 01:03:56
【问题描述】:

我正在使用 Camel 2.15.3camel-netty4,自从从 camel-netty3 升级后,我在通过 UDP 接收完整 JSON 消息时遇到问题。每条 JSON 消息大约 3 到 5 KB,但我的 MessageToMessageDecoder 实现只给了我前 2048 个(即 2k 字节)。从一个测试程序中,我发送了一条 UDP 消息,并且从我的 MessageToMessageDecoder 中的调试打印显示,decode() 方法只被调用了一次。

我目前正在阅读 Netty In Action,但我在日志文件中看到了这一点:UnpooledUnsafeDirectByteBuf(ridx: 0, widx: 2048, cap: 2048))

我迫切需要在生产中解决这个问题,只需要能够通过 UDP 接收 JSON 消息并通过我的 Camel 路由发送它们。我不知道什么是最好的框架(如果有的话)?

使用 netty3 这工作正常,我有一个 UdpPacketDecoder 实现 ChannelUpstreamHandler 调用 Channels.fireMessageReceived(ctx, message, me.getRemoteAddress()) 来触发消息到下一个处理程序,它似乎工作正常。

我的路线如下所示。它从 netty4:udp 消费并生成到 SEDA 队列,现在只是在测试时:

    <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
    <route startupOrder="104" customId="true" id="ROUTE_ID_RAW_CQMS_EVENTS" xmlns="http://camel.apache.org/schema/spring">
            <from uri="netty4:udp://devserver-09.dev.s.mission.net:11111?serverPipelineFactory=#CQMS_SERVER_PIPELINE_FACTORY_ROUTE_ID_RAW_CQMS_EVENTS&amp;keepAlive=true&amp;sync=false&amp;receiveBufferSize=26214400&amp;sendBufferSize=26214400&amp;allowDefaultCodec=false&amp;disconnectOnNoReply=false&amp;receiveBufferSizePredictor=8192"/>
            <setProperty propertyName="CamelCharsetName" id="setProperty1">
                    <expressionDefinition>iso-8859-1</expressionDefinition>
            </setProperty>
            <threads poolSize="7" maxPoolSize="14" threadName="threads_ROUTE_ID_RAW_CQMS_EVENTS" callerRunsWhenRejected="true" id="threads1">
                    <to uri="seda:SEDA_INPUT_QUEUE_102?size=200000&amp;concurrentConsumers=10&amp;waitForTaskToComplete=Never&amp;failIfNoConsumers=true&amp;timeout=10000" id="to1"/>
                    <setProperty propertyName="CamelCharsetName" id="setProperty2">
                            <expressionDefinition>iso-8859-1</expressionDefinition>
                    </setProperty>
            </threads>
    </route>

我打印出收到的DatagramPacket,显示如下:UnpooledUnsafeDirectByteBuf(ridx: 0, widx: 2048, cap: 2048))

这是我的 MessageToMessageDecoder 实现:

    package com.mission.mplr.multiprotocollistenerrouter;

    import com.vonage.mplr.utils.MiscUtils;
    import io.netty.channel.ChannelHandlerContext; // Represents the "binding" between a ChannelHandler and the ChannelPipeline.
    import io.netty.channel.socket.DatagramPacket;
    import io.netty.handler.codec.MessageToMessageDecoder;
    import java.nio.charset.Charset;
    import java.util.List;
    import org.slf4j.Logger;        // The org.slf4j.Logger interface is the main user entry point of SLF4J API.
    import org.slf4j.LoggerFactory; // Utility class producing Loggers for various logging APIs, most notably for log4j.


    public class UdpDatagramDecoder extends MessageToMessageDecoder<DatagramPacket> {
            private static final Logger logger      = LoggerFactory.getLogger(UdpDatagramDecoder.class);
            private static final Logger errorLogger = LoggerFactory.getLogger("ERROR_LOGGER");
            private final String CHARSET_NAME;

            UdpDatagramDecoder(String charsetName) {
                    this.CHARSET_NAME = charsetName;
            }

            @Override

            public boolean acceptInboundMessage(Object msg) throws Exception {
                    return true;
            }

            @Override
            protected void decode(ChannelHandlerContext chc, DatagramPacket packet, List out) throws Exception {
                    logger.info("decode(): ENTER");

                    logger.info("decode(): Received datagram = {}", packet);

                    String packetAsString = packet.content().toString(Charset.forName(CHARSET_NAME));

                    if(packetAsString == null) {
                            return; // Nothing to do
                    } else {
                            out.add(packetAsString);
                            packet.retain();
                    }

                    logger.info("decode(): bodyBytesAsString[size={}] = {}", packetAsString.length(), packetAsString);

                    String bodyBytesAsHex = MiscUtils.stringAsHex(packetAsString, CHARSET_NAME);
                    logger.info("decode(): bodyBytesAsHex[size={}] = {}", bodyBytesAsHex.length(), bodyBytesAsHex);

                    logger.info("decode(): EXIT");
            }
    }
    // ------------- end --------------

我的服务器管道有这个 initChannel() 实现:

@Override
protected void initChannel(Channel ch) throws Exception {
    logger.trace("initChannel(): ENTER");

    ChannelPipeline channelPipeline = ch.pipeline();
    serverInvoked = true;  

    String theSourceRouteId = consumer.getRoute().getId();
    logger.debug("initChannel(): consumer = {}, theSourceRouteId = {}", consumer.toString(), theSourceRouteId);

    // -------------------------------------------------------------------
    // Here we add the custom UDP datagram decoder. Decoders are typically
    // stateful, thus we create a new instance with every pipeline.
    // -------------------------------------------------------------------
    String udpPacketDecoderName = "CQMS_UDP_DATAGRAM_DECODER_" + theSourceRouteId;
    logger.debug("initChannel(): Adding {}", udpPacketDecoderName);
    channelPipeline.addLast(udpPacketDecoderName, new UdpDatagramDecoder(CHARSET_NAME));

    // -----------------------------------------------------------------------------------------
    // Default Camel ServerChannelHandler for the consumer, to allow Camel to route the message.
    // -----------------------------------------------------------------------------------------
    String serverChannelHandlerName = "CQMS_SERVER_CHANNEL_HANDLER_" + theSourceRouteId;
    logger.debug("initChannel(): Adding {}", serverChannelHandlerName);
    channelPipeline.addLast(serverChannelHandlerName, new ServerChannelHandler(consumer));

    logger.trace("initChannel(): EXIT");
} 

【问题讨论】:

  • 您对这些 UDP 数据报大小相当乐观。普遍接受的 UDP 数据报安全限制是 534 或 576,永远记不得是哪个。

标签: json udp apache-camel netty


【解决方案1】:

Netty 默认使用 2048 作为数据报包的上限。您可以通过在 Bootstrap 上设置您自己的 FixedRecvByteBufAllocator 实例来更改此设置。不过,不确定如何通过 Camel 完成此操作。

【讨论】:

    【解决方案2】:

    非常感谢诺曼!以下是适用于 Camel 2.15.3 的解决方案。 基本上,我们从应用程序的配置中读取上限,并在 ServerInitializerFactory 的 initChannel(Channel ch) 方法中进行设置。

    @Override
    protected void initChannel(Channel ch) throws Exception {
    
        ChannelPipeline channelPipeline = ch.pipeline();
        serverInvoked = true;  
    
        // -------------------------------------------------------------------
        // Here we add the custom UDP datagram decoder. Decoders are typically
        // stateful, thus we create a new instance with every pipeline.
        // -------------------------------------------------------------------
        String udpDecoderName = "UDP_DECODER_" + theSourceRouteId;
        channelPipeline.addLast(udpDecoderName, new UdpPacketDecoder_ADAPTER(CHARSET_NAME));
    
        // ---------------------------------------------------------------------
        // Netty4 has default of 2048 bytes as upper limit for datagram packets.
        // Here we override the default upper limit based on a config param.
        // ---------------------------------------------------------------------  
        if(ConfigManager.getInstance().getRecvByteBufAllocator() > 0) {
            ch.config().setRecvByteBufAllocator(new FixedRecvByteBufAllocator(ConfigManager.getInstance().getRecvByteBufAllocator()));
        }
        // -----------------------------------------------------------
        // Add string encoder (downstream) / string decoder (upstream)
        // -----------------------------------------------------------
        // For decoding from a ChannelBuffer to a String object
        String stringDecoderName = "SERVER_PIPELINE_STRING_DECODER_" + theSourceRouteId;
        channelPipeline.addLast(stringDecoderName, STR_DECODER);
    
        // For encoding from a String object into a ChannelBuffer
        String stringEncoderName = "SERVER_PIPELINE_STRING_ENCODER_" + theSourceRouteId;
        channelPipeline.addLast(stringEncoderName, STR_ENCODER);
    
        // For encoding from a String object into a DatagramPacket
        String datagramPacketEncoderName = "SERVER_PIPELINE_DATAGRAM_PACKET_ENCODER_" + theSourceRouteId;
        channelPipeline.addLast(datagramPacketEncoderName, DATAGRAM_PACKET_ENCODER);
    
        // -----------------------------------------------------------------------------------------
        // Default Camel ServerChannelHandler for the consumer, to allow Camel to route the message.
        // -----------------------------------------------------------------------------------------
        String serverChannelHandlerName = "SERVER_CHANNEL_HANDLER_" + theSourceRouteId;
        channelPipeline.addLast(serverChannelHandlerName, new ServerChannelHandler(consumer));
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2013-06-23
      • 1970-01-01
      • 2011-08-01
      • 2011-06-30
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多