【问题标题】:Camel and Netty: client - intermediary app - serverCamel 和 Netty:客户端 - 中介应用程序 - 服务器
【发布时间】:2017-12-23 11:49:28
【问题描述】:

我正在使用 Camel 和 Netty 设置一个场景,其中客户端连接到服务器,中间有一个应用程序(只是一个名为 Router 的虚拟应用程序)。

SocketClient 连接到Router(端口53379),Router 连接Server(端口53383)。问题是数据包永远不会到达服务器(虽然它确实到达了路由器,但我使用处理器对其进行了调试)。

如果我将 SocketClient 连接到服务器,它工作正常。

感谢任何帮助。

完整来源

路由器:

public class Router {
    public static void main(String[] args) throws Exception {
        SimpleRegistry registry = new SimpleRegistry();
        registry.put("ByteArrayEncoder", new ByteArrayEncoder());
        registry.put("ByteArrayDecoder", new ByteArrayDecoder());
        
        CamelContext context = new DefaultCamelContext(registry);
        
        context.addRoutes(new RouteBuilder() {
            @Override
            public void configure() throws Exception {
                from("netty4:tcp://localhost:53379?encoders=#ByteArrayEncoder&"
                        + "decoders=#ByteArrayDecoder"
                        + "&sync=true"
                        + "&keepAlive=true")
                .to("netty4:tcp://localhost:53383?encoders=#ByteArrayEncoder&"
                        + "decoders=#ByteArrayDecoder"
                        + "&sync=true"
                        + "&keepAlive=true");               
            }
        });

        context.start();
        while(true) {
            Thread.sleep(1000);
        }
    }
}

服务器:

public class Server {
    public static void main(String[] args) throws Exception {
        SimpleRegistry registry = new SimpleRegistry();
        registry.put("ByteArrayEncoder", new ByteArrayEncoder());
        registry.put("ByteArrayDecoder", new ByteArrayDecoder());
        
        CamelContext context = new DefaultCamelContext(registry);
        
        context.addRoutes(new RouteBuilder() {
            @Override
            public void configure() throws Exception {
                from("netty4:tcp://localhost:53383?encoders=#ByteArrayEncoder&"
                        + "decoders=#ByteArrayDecoder"
                        + "&sync=true"
                        + "&keepAlive=true")
                .process(new Processor() {
                    public void process(Exchange exchange) throws Exception {
                        Message message = exchange.getIn();
                        System.out.println("from este_stub: " + message.getBody());
                        exchange.setOut(message);
                    }
                });
            }
        });

        context.start();
        while(true) {
            Thread.sleep(1000);
        }
    }
}

客户:

public class SocketClient {
    public static void main(String[] args) throws UnknownHostException, IOException {
        Socket socket = new Socket("localhost", 53379);
        OutputStream simOutStream = socket.getOutputStream();
        BufferedInputStream simInStream = new BufferedInputStream(socket.getInputStream());        
        
        byte[] arr = new byte[] {1, 2, 3, 4, 5};
        simOutStream.write(arr, 0, arr.length);
        simOutStream.flush();
        
        
        byte[] resp = new byte[5];
        simInStream.read(resp, 0, resp.length);
        for(byte ar : resp)
            System.out.print(ar);
        
        socket.close();
    }
}

【问题讨论】:

    标签: apache-camel netty


    【解决方案1】:

    运行您共享的代码会在路由器类io.netty.channel.ChannelPipelineException: io.netty.handler.codec.bytes.ByteArrayDecoder is not a @Sharable handler, so can't be added or removed multiple times. 中出现以下错误

    正如例外所说,ByteArrayDecoderByteArrayEncoder 不是可共享的处理程序,因此是错误的原因。从 Sharable 的 javadoc 中,它明确指出

    表示可以将带注释的 ChannelHandler 的同一个实例多次添加到一个或多个 ChannelPipeline 中而无需竞争条件。如果未指定此注解,则每次将其添加到管道时都必须创建一个新的处理程序实例,因为它具有成员变量等非共享状态。

    netty4 的骆驼文档在 Camel netty4 也说明了这一点

    如下:

    如果您的编码器或解码器不可共享(例如,它们具有@Shareable 类注释),那么您的编码器/解码器必须实现 org.apache.camel.component.netty.ChannelHandlerFactory 接口,并在newChannelHandler 方法。这是为了确保编码器/解码器可以安全使用。如果不是这种情况,则 Netty 组件将在以下情况下记录一个 WARN 创建了一个端点。 Netty 组件提供了一个 org.apache.camel.component.netty.ChannelHandlerFactories 工厂类,它有许多常用的方法。

    所以你的问题可以通过使用一些自定义的编码器和解码器来实现 ChannelHandlerFactory 并覆盖 newChannelHandler ,如下所示:

    公共类 CustomByteArrayDecoder 实现 ChannelHandlerFactory {

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    
    }
    
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    
    }
    
    @Override
    public ChannelHandler newChannelHandler() {
        return new ByteArrayDecoder();
    }
    

    对于编码器也是如此。然后在路由器和服务器中进行适当的更改,如下所示:

    SimpleRegistry registry = new SimpleRegistry();
    registry.put("ByteArrayEncoder", new CustomByteArrayEncoder());
    registry.put("ByteArrayDecoder", new CustomByteArrayDecoder());
    

    【讨论】:

    • 谢谢!当你说异常时,我注意到我忘记设置 log4j.properties 文件。顺便说一句,实现 ChannelFactory 只需要 newChannelHandler() 方法。
    • 太好了。还要检查 Camel netty4 文档,因为他们在配置端点时提到了一些重要的选项。
    猜你喜欢
    • 2013-11-08
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-09-17
    • 2016-12-03
    相关资源
    最近更新 更多