【问题标题】:Netty BufferSizePredictor truncates UDP datagramNetty BufferSizePredictor 截断 UDP 数据报
【发布时间】:2015-09-01 11:13:14
【问题描述】:

我正在尝试开发可以接收自定义 UDP 数据包的自定义 Flume 源。 这是我的代码:

public class XvlrUdpSource extends AbstractSource
        implements EventDrivenSource, Configurable {

    private static final Logger LOG = LoggerFactory.getLogger(XvlrUdpSource.class);


    private int port;
    private String host;
    private Channel nettyChannel;

    private static final Logger logger = LoggerFactory.getLogger(XvlrUdpSource.class);

    private CounterGroup counterGroup = new CounterGroup();

    public class XvlrUpdHander extends SimpleChannelHandler {

        @Override
        public void messageReceived(ChannelHandlerContext ctx, MessageEvent mEvent) {
            try {
                System.out.println("class: "+ mEvent.getMessage().getClass());
                /** ChannelBuffer holds just first 768 bytes of the whole input UDP packet*/
                ChannelBuffer channelBuffer = (ChannelBuffer)mEvent.getMessage();
                   Event xvlrPacketEvent = EventBuilder.withBody( ((ChannelBuffer)mEvent.getMessage()).array());
                System.out.println("Length is:["+xvlrPacketEvent.getBody().length+"]");
                //Event e = syslogUtils.extractEvent((ChannelBuffer)mEvent.getMessage());
                if(xvlrPacketEvent == null){
                    return;
                }
                getChannelProcessor().processEvent(xvlrPacketEvent);
                counterGroup.incrementAndGet("events.success");
            } catch (ChannelException ex) {
                counterGroup.incrementAndGet("events.dropped");
                logger.error("Error writting to channel", ex);
                return;
            }
        }
    }

    @Override
    public void start() {
        ConnectionlessBootstrap serverBootstrap = new ConnectionlessBootstrap
                (new OioDatagramChannelFactory(Executors.newCachedThreadPool()));
        final XvlrUpdHander handler = new XvlrUpdHander();
        serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            @Override
            public ChannelPipeline getPipeline() {
                return Channels.pipeline(handler);
            }
        });

        if (host == null) {
            nettyChannel = serverBootstrap.bind(new InetSocketAddress(port));
        } else {
            nettyChannel = serverBootstrap.bind(new InetSocketAddress(host, port));
        }

        super.start();
    }

    @Override
    public void stop() {
        logger.info("Syslog UDP Source stopping...");
        logger.info("Metrics:{}", counterGroup);
        if (nettyChannel != null) {
            nettyChannel.close();
            try {
                nettyChannel.getCloseFuture().await(60, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                logger.warn("netty server stop interrupted", e);
            } finally {
                nettyChannel = null;
            }
        }

        super.stop();
    }

    @Override
    public void configure(Context context) {
        Configurables.ensureRequiredNonNull(
                context, "port");//SyslogSourceConfigurationConstants.CONFIG_PORT);
        port = context.getInteger("port");//SyslogSourceConfigurationConstants.CONFIG_PORT);
        host = context.getString("host");//SyslogSourceConfigurationConstants.CONFIG_HOST);
        //formaterProp = context.getSubProperties("PROP");//SyslogSourceConfigurationConstants.CONFIG_FORMAT_PREFIX);
    }

}

我在 messageRecieved 上进行了调试,并在堆栈跟踪中看到:

/**
     * Sends a {@code "messageReceived"} event to the first
     * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of
     * the specified {@link Channel} belongs.
     *
     * @param message        the received message
     * @param remoteAddress  the remote address where the received message
     *                       came from
     */
    public static void fireMessageReceived(Channel channel, Object message, SocketAddress remoteAddress) {
        channel.getPipeline().sendUpstream(
                new UpstreamMessageEvent(channel, message, remoteAddress));
    }

我的对象消息已经是 768 字节长度了。

根在这里org.jboss.netty.channel.socket.oio.OioDatagramWorker

byte[] buf = new byte[predictor.nextReceiveBufferSize()];
            DatagramPacket packet = new DatagramPacket(buf, buf.length);

Predictor 将缓冲区大小设置为 768 那么:

fireMessageReceived(
                    channel,
                    channel.getConfig().getBufferFactory().getBuffer(buf, 0, packet.getLength()),
                    packet.getSocketAddress());

我只得到前 768 个字节。 有没有机会改变预测者的行为?

【问题讨论】:

    标签: networking udp netty


    【解决方案1】:

    我发现了这个话题: Netty Different Pipeline Per UDP Datagram

    可以使用特殊属性“注入”具有所需行为的预测器。 所以完整的解决方案是:

    public class XvlrUdpSource extends AbstractSource
            implements EventDrivenSource, Configurable {
    
        private static final Logger LOG = LoggerFactory.getLogger(XvlrUdpSource.class);
    
    
        private int port;
        private String host;
        private Channel nettyChannel;
    
        private CounterGroup counterGroup = new CounterGroup();
    
        public class XvlrUpdHander extends SimpleChannelHandler {
    
            @Override
            public void messageReceived(ChannelHandlerContext ctx, MessageEvent mEvent) {
                try {
                    ChannelBuffer channelBuffer = (ChannelBuffer)mEvent.getMessage();
                    int actualSizeOfUdpPacket = channelBuffer.readableBytes();
                    byte[] body = Arrays.copyOf(channelBuffer.array(), actualSizeOfUdpPacket);
                    Event xvlrPacketEvent = EventBuilder.withBody(body);
                    LOG.debug("Event.body length is: {} ", xvlrPacketEvent.getBody().length);
                    if(xvlrPacketEvent == null){
                        return;
                    }
                    getChannelProcessor().processEvent(xvlrPacketEvent);
                    counterGroup.incrementAndGet("events.success");
                } catch (ChannelException ex) {
                    counterGroup.incrementAndGet("events.dropped");
                    LOG.error("Error writting to channel", ex);
                    return;
                }
            }
        }
    
    
        @Override
        public void start() {
            OioDatagramChannelFactory oioDatagramChannelFactory =   new OioDatagramChannelFactory(                                                                           Executors.newCachedThreadPool());
            ConnectionlessBootstrap serverBootstrap = new ConnectionlessBootstrap(oioDatagramChannelFactory);
            serverBootstrap.setOption("sendBufferSize", 65536);
            serverBootstrap.setOption("receiveBufferSize", 65536);
            serverBootstrap.setOption("receiveBufferSizePredictorFactory",
                                        new AdaptiveReceiveBufferSizePredictorFactory(8192, 8192, 16384));
    
    
            final XvlrUpdHander handler = new XvlrUpdHander();
    
            serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
                @Override
                public ChannelPipeline getPipeline() {
                    return Channels.pipeline(handler);
                }
            });
            if (host == null) {
                nettyChannel = serverBootstrap.bind(new InetSocketAddress(port));
            } else {
                nettyChannel = serverBootstrap.bind(new InetSocketAddress(host, port));
            }
    
        }
    
            @Override
        public void stop() {
            LOG.info("Syslog UDP Source stopping...");
            LOG.info("Metrics:{}", counterGroup);
            if (nettyChannel != null) {
                nettyChannel.close();
                try {
                    nettyChannel.getCloseFuture().await(60, TimeUnit.SECONDS);
                } catch (InterruptedException e) {
                    LOG.warn("netty server stop interrupted", e);
                } finally {
                    nettyChannel = null;
                }
            }
    
            super.stop();
        }
    
        @Override
        public void configure(Context context) {
            Configurables.ensureRequiredNonNull(context, "port");
            port = context.getInteger("port");
            host = context.getString("host");
       }
    }
    

    【讨论】:

      【解决方案2】:

      要么发送 768 字节,要么接收缓冲区只有 768 字节长。它当然与回车无关,除非你的代码中有一些错误的处理。

      【讨论】:

      • 好的,我会尝试在发送方添加调试。我没有 100% 的证据证明它将整个数据包准确地发送到这个特定的 Flume 源中。
      • 我确实添加了调试到基于 Apache mina 的客户端。将相同的数据传输到手工制作的 UDP 服务器时,它工作正常。我的目标是用 Flume UDP 源组件替换它。
      猜你喜欢
      • 1970-01-01
      • 2012-05-07
      • 2017-05-13
      • 2011-02-05
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多