【问题标题】:TCP Server configuration in Mule - writing into client socketMule 中的 TCP 服务器配置 - 写入客户端套接字
【发布时间】:2015-03-12 18:40:40
【问题描述】:

我正在尝试使用 TCP 入站端点创建 mule 流,该端点是侦听端口的 TCP 服务器。当识别出成功的客户端连接后,在接收到来自客户端的任何请求之前,我需要将一条消息写入套接字(这让客户端知道我正在侦听),然后客户端才会向我发送进一步的请求。这就是我使用示例 java 程序的方法:

import java.net.*; 
import java.io.*; 

public class TCPServer 
{ 
 public static void main(String[] args) throws IOException 
   { 
    ServerSocket serverSocket = null; 

    try { 
         serverSocket = new ServerSocket(4445); 
        } 
    catch (IOException e) 
        { 
         System.err.println("Could not listen on port: 4445."); 
         System.exit(1); 
        } 

    Socket clientSocket = null; 
    System.out.println ("Waiting for connection.....");

    try { 
         clientSocket = serverSocket.accept(); 
        } 
    catch (IOException e) 
        { 
         System.err.println("Accept failed."); 
         System.exit(1); 
        } 

    System.out.println ("Connection successful");
    System.out.println ("Sending output message - .....");

    //Sending a message to the client to indicate that the server is active
    PrintStream pingStream = new PrintStream(clientSocket.getOutputStream());
    pingStream.print("Server listening");
    pingStream.flush();

    //Now start listening for messages 
    System.out.println ("Waiting for incoming message - .....");
    PrintWriter out = new PrintWriter(clientSocket.getOutputStream(),true); 
    BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream())); 

    String inputLine; 

    while ((inputLine = in.readLine()) != null) 
        { 
         System.out.println ("Server: " + inputLine); 
         out.println(inputLine); 

         if (inputLine.equals("Bye.")) 
             break; 
        } 

    out.close(); 
    in.close(); 
    clientSocket.close(); 
    serverSocket.close(); 
   } 
} 

我尝试将 Mule 的 TCP 入站端点用作服务器,但我无法看到如何识别来自客户端的成功连接,以触发出站消息。仅当从客户端发送消息时才会触发流。有没有一种方法可以扩展 Mule TCP 连接器的功能并拥有一个可以满足上述要求的侦听器?

根据提供的答案,这就是我的实现方式 -

public class TCPMuleOut extends TcpMessageReceiver {

    boolean InitConnection = false;
    Socket clientSocket = null;

    public TCPMuleOut(Connector connector, FlowConstruct flowConstruct,
            InboundEndpoint endpoint) throws CreateException {
        super(connector, flowConstruct, endpoint);
    }

    protected Work createWork(Socket socket) throws IOException {
        return new MyTcpWorker(socket, this);
    }


    protected class MyTcpWorker extends TcpMessageReceiver.TcpWorker {

        public MyTcpWorker(Socket socket, AbstractMessageReceiver receiver)
                throws IOException {
            super(socket, receiver);
            // TODO Auto-generated constructor stub
        }

        @Override
        protected Object getNextMessage(Object resource) throws Exception {
            if (InitConnection == false) {

                clientSocket = this.socket;
                logger.debug("Sending logon message");
                PrintStream pingStream = new PrintStream(
                        clientSocket.getOutputStream());
                pingStream.print("Log on message");
                pingStream.flush();
                InitConnection = true;
            }

            long keepAliveTimeout = ((TcpConnector) connector)
                    .getKeepAliveTimeout();

            Object readMsg = null;
            try {
                // Create a monitor if expiry was set
                if (keepAliveTimeout > 0) {
                    ((TcpConnector) connector).getKeepAliveMonitor()
                            .addExpirable(keepAliveTimeout,
                                    TimeUnit.MILLISECONDS, this);
                }

                readMsg = protocol.read(dataIn);

                // There was some action so we can clear the monitor
                ((TcpConnector) connector).getKeepAliveMonitor()
                        .removeExpirable(this);

                if (dataIn.isStreaming()) {
                }

                return readMsg;
            } catch (SocketTimeoutException e) {
                ((TcpConnector) connector).getKeepAliveMonitor()
                        .removeExpirable(this);
                System.out.println("Socket timeout");
            } finally {
                if (readMsg == null) {
                    // Protocols can return a null object, which means we're
                    // done
                    // reading messages for now and can mark the stream for
                    // closing later.
                    // Also, exceptions can be thrown, in which case we're done
                    // reading.
                    dataIn.close();
                    InitConnection = false;
                    logger.debug("Client closed");
                }
            }
            return null;
        }
    }
} 

TCP连接器如下:

<tcp:connector name="TCP" doc:name="TCP connector"
    clientSoTimeout="100000" receiveBacklog="0" receiveBufferSize="0"
    sendBufferSize="0" serverSoTimeout="100000" socketSoLinger="0"
    validateConnections="true" keepAlive="true">
    <receiver-threading-profile
        maxThreadsActive="5" maxThreadsIdle="5" />
    <reconnect-forever />
    <service-overrides messageReceiver="TCPMuleOut" />
    <tcp:direct-protocol payloadOnly="true" />
</tcp:connector>

【问题讨论】:

    标签: mule


    【解决方案1】:

    您尝试做的事情有点难以完成,但并非不可能。消息由org.mule.transport.tcp.TcpMessageReceiver 类接收,并且该类始终使用输入流中的数据来创建注入流中的消息。 但是,您可以通过在流的 tcp 连接器(记录在 here)中添加 service-overrides 标记并替换 messageReceiver 元素来扩展该接收器并指示 TCP 模块使用您的接收器。 在您的扩展接收器中,您应该更改TcpWorker.getNextMessage 方法,以便在从输入流中读取之前发送确认消息。 HTH,马科斯。

    【讨论】:

    • 这是个好建议。我会用另一种选择来平衡它:jvas 可以使用 DevKit 创建一个自定义模块,而不是扩展 Mule 的类(它可以工作,但会让您了解内部 API 更改)。后者会给他工作室支持作为奖励:)
    • 谢谢 MarcosNC 和@David Dossot。我可以通过扩展 TCpMessageReceiver 类来做到这一点。我将更新我的解决方案作为答案。
    猜你喜欢
    • 1970-01-01
    • 2016-02-08
    • 1970-01-01
    • 2019-01-31
    • 2012-11-15
    • 2016-03-18
    • 1970-01-01
    • 2018-07-02
    • 1970-01-01
    相关资源
    最近更新 更多