【问题标题】:Messages not reaching destination queue when using ServerInitializerFactory Netty 4使用 ServerInitializerFactory Netty 4 时消息未到达目标队列
【发布时间】:2015-09-17 09:07:19
【问题描述】:

我在 grails 中使用 apache camel netty4,并且我已经声明 mycustom ServerInitializerFactory 如下

public class MyServerInitializerFactory extends ServerInitializerFactory {
    private int maxLineSize = 1048576;
    NettyConsumer nettyConsumer

    public MimacsServerInitializerFactory() {}

    @Override
    protected void initChannel(Channel channel) throws Exception {
        ChannelPipeline pipeline = channel.pipeline()
        pipeline.addLast("logger", new LoggingHandler(LogLevel.INFO))
        pipeline.addLast("framer", new LengthFieldBasedFrameDecoder(ByteOrder.LITTLE_ENDIAN, maxLineSize, 2, 2, 6, 0, false))
        pipeline.addLast("decoder", new MfuDecoder())
        pipeline.addLast("encoder", new MfuEncoder())
        pipeline.addLast("handler", new MyServerHandler())
    }
}

我有一条路线,我在路线构建器中设置如下。

from('netty4:tcp://192.168.254.3:553?serverInitializerFactory=#sif&keepAlive=true&sync=true&allowDefaultCodec=false').to('activemq:queue:Tracking.Queue')

我的 Camel Context 在 BootStrap.groovy 中设置如下

def serverInitializerFactory = new MyServerInitializerFactory()
SimpleRegistry registry = new SimpleRegistry()
registry.put("sif", serverInitializerFactory)

CamelContext camelContext = new DefaultCamelContext(registry)
camelContext.addComponent("activemq",  activeMQComponent.activeMQComponent("failover:tcp://localhost:61616"))
camelContext.addRoutes new TrackingMessageRoute()
camelContext.start()

当我运行我的应用程序时,我的路由已启动,并且我的成帧器、解码器、处理程序和编码器都被调用,但消息没有到达跟踪。队列和响应不会返回给客户端。

如果我不在 netty url 和用户编码器和解码器中使用 serverInitializerFactory,我的消息将进入队列,但我无法控制我想要为收到的每种类型的消息发送的确认。似乎activemq 试图发送自己的响应,但被我的编码器拒绝。

我应该然后编写代码以再次发送还是我遗漏了什么?

【问题讨论】:

    标签: grails apache-camel activemq netty


    【解决方案1】:

    您需要为消费者添加一个处理程序,以便可以对其进行路由,请参阅单元测试它是如何完成的:

    【讨论】:

    • 感谢您的回复。如果您检查我发布的代码 sn-ps,我有一个类似的处理程序。我的处理程序中是否需要将消息发送到队列的代码。由于队列是我骆驼路线中的目的地,因此消息不应该只是进入队列吗?
    【解决方案2】:

    我设法解决了这个问题。在我的 channelRead0 方法中。我添加了以下几行

    Exchange exchange = this.consumer.getEndpoint().createExchange(ctx, msg);
    

    其中ctx是ChannelContextHandler,msg是Message Object,两者都是channelRead0方法的参数。

    我还添加了以下几行

    this.consumer.createUoW(exchange);
    

    在我的处理代码之后,我插入了以下行

    this.consumer.doneUoW(exchange);
    

    一切都像魅力一样。

    【讨论】:

      猜你喜欢
      • 2014-08-28
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2012-05-22
      • 2021-07-21
      • 1970-01-01
      相关资源
      最近更新 更多