【问题标题】:Non-blocking sockets非阻塞套接字
【发布时间】:2010-10-09 02:30:59
【问题描述】:

在 Java 中实现非阻塞套接字的最佳方法是什么?

或者有这样的事情吗?我有一个通过套接字与服务器通信的程序,但如果数据/连接出现问题,我不希望套接字调用阻塞/导致延迟。

【问题讨论】:

    标签: java sockets network-programming blocking nonblocking


    【解决方案1】:

    Java 非阻塞套接字,在 Java 2 标准版 1.4 中引入,允许应用程序之间的网络通信,而不会阻塞使用套接字的进程。但什么是非阻塞套接字,它在哪些情况下有用,它是如何工作的?

    什么是非阻塞套接字?

    非阻塞套接字允许在通道上进行 I/O 操作,而不会阻塞使用它的进程。这意味着,我们可以使用单个线程来处理多个并发连接并获得“异步高性能”读/写操作(可能有人不同意)

    好的,它在哪些情况下有用?

    假设您想实现一个接受不同客户端连接的服务器。还假设您希望服务器能够同时处理多个请求。使用传统方式开发这样的服务器有两种选择:

    • 实现一个多线程服务器,为每个连接手动处理一个线程。
    • 使用外部第三方模块。

    两种解决方案都有效,但采用第一个解决方案必须开发整个线程管理解决方案,并存在相关的并发和冲突问题。第二种解决方案使应用程序依赖于非 JDK 外部模块,并且您可能必须使该库适应您的需要。通过非阻塞套接字,您可以实现一个非阻塞服务器,而无需直接管理线程或求助于外部模块。

    它是如何工作的?

    在详细介绍之前,您需要了解几个术语:

    • 在基于 NIO 的实现中,我们不是将数据写入输出流并从输入流读取数据,而是从 缓冲区 读取和写入数据。 buffer 可以定义为临时存储。
    • Channel 将大量数据传输进出缓冲区。此外,它可以被视为通信的端点。
    • Readiness Selection 是一个概念,指的是“选择在读取或写入数据时不会阻塞的套接字的能力。”

    Java NIO 有一个名为Selector 的类,它允许单个线程检查多个通道上的 I/O 事件。这怎么可能?好吧,selector 可以检查通道的“就绪”,以了解客户端尝试连接或读/写操作等事件。也就是说,Selector 的每个实例都可以监控更多的 socket 通道,从而可以监控更多的连接。现在,当通道上发生某些事情(事件发生)时,selector 会通知应用程序处理请求selector 通过创建 事件键(或选择键)来实现,它们是 SelectionKey 类的实例。每个key 都包含有关提出请求的人员请求的类型的信息,如图 1 所示。

    图一:结构图

    基本实现

    服务器实现包含一个无限循环,其中selector 等待事件并创建事件键。密钥有四种可能的类型:

    • 可接受:关联的客户端请求连接。
    • 可连接:服务器接受了连接。
    • 可读:服务器可以读取。
    • 可写:服务器可以写。

    通常acceptable 密钥是在服务器端创建的。实际上,这种密钥只是通知服务器客户端需要连接,然后服务器将套接字通道个性化并将其关联到选择器以进行读/写操作。在此之后,当接受的客户端读取或写入某些内容时,选择器将为该客户端创建 readablewriteable 键..

    现在您已准备好按照建议的算法用 Java 编写服务器。套接字通道的创建、selector 和套接字选择器注册可以这样进行:

    final String HOSTNAME = "127.0.0.1";
    final int PORT = 8511;
    
    // This is how you open a ServerSocketChannel
    serverChannel = ServerSocketChannel.open();
    // You MUST configure as non-blocking or else you cannot register the serverChannel to the Selector.
    serverChannel.configureBlocking(false);
    // bind to the address that you will use to Serve.
    serverChannel.socket().bind(new InetSocketAddress(HOSTNAME, PORT));
    
    // This is how you open a Selector
    selector = Selector.open();
    /*
     * Here you are registering the serverSocketChannel to accept connection, thus the OP_ACCEPT.
     * This means that you just told your selector that this channel will be used to accept connections.
     * We can change this operation later to read/write, more on this later.
     */
    serverChannel.register(selector, SelectionKey.OP_ACCEPT);
    

    首先,我们使用ServerSocketChannel.open() 方法创建SocketChannel 的实例。接下来,configureBlocking(false) 调用将此 channel 设置为非阻塞。与服务器的连接是通过serverChannel.socket().bind() 方法建立的。 HOSTNAME代表服务器的IP地址,PORT是通讯端口。最后调用Selector.open()方法创建一个selector实例并注册到channel和注册类型。在此示例中,注册类型为OP_ACCEPT,这意味着选择器仅报告客户端尝试连接到服务器。其他可能的选项是:OP_CONNECT,将由客户端使用; OP_READ;和OP_WRITE

    现在我们需要使用无限循环来处理这些请求。一个简单的方法如下:

    // Run the server as long as the thread is not interrupted.
    while (!Thread.currentThread().isInterrupted()) {
        /*
         * selector.select(TIMEOUT) is waiting for an OPERATION to be ready and is a blocking call.
         * For example, if a client connects right this second, then it will break from the select()
         * call and run the code below it. The TIMEOUT is not needed, but its just so it doesn't
         * block undefinable.
         */
        selector.select(TIMEOUT);
    
        /*
         * If we are here, it is because an operation happened (or the TIMEOUT expired).
         * We need to get the SelectionKeys from the selector to see what operations are available.
         * We use an iterator for this.
         */
        Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
    
        while (keys.hasNext()) {
            SelectionKey key = keys.next();
            // remove the key so that we don't process this OPERATION again.
            keys.remove();
    
            // key could be invalid if for example, the client closed the connection.
            if (!key.isValid()) {
                continue;
            }
            /*
             * In the server, we start by listening to the OP_ACCEPT when we register with the Selector.
             * If the key from the keyset is Acceptable, then we must get ready to accept the client
             * connection and do something with it. Go read the comments in the accept method.
             */
            if (key.isAcceptable()) {
                System.out.println("Accepting connection");
                accept(key);
            }
            /*
             * If you already read the comments in the accept() method, then you know we changed
             * the OPERATION to OP_WRITE. This means that one of these keys in the iterator will return
             * a channel that is writable (key.isWritable()). The write() method will explain further.
             */
            if (key.isWritable()) {
                System.out.println("Writing...");
                write(key);
            }
            /*
             * If you already read the comments in the write method then you understand that we registered
             * the OPERATION OP_READ. That means that on the next Selector.select(), there is probably a key
             * that is ready to read (key.isReadable()). The read() method will explain further.
             */
            if (key.isReadable()) {
                System.out.println("Reading connection");
                read(key);
            }
        }
    }
    

    你可以找到the implementation source here

    注意:异步服务器

    作为非阻塞实现的替代方案,我们可以部署异步服务器。例如,您可以使用AsynchronousServerSocketChannel 类,它为面向流的侦听套接字提供了一个异步通道。

    要使用它,首先执行它的静态open() 方法,然后将bind() 执行到特定的端口。接下来,您将执行其accept() 方法,将一个实现CompletionHandler 接口的类传递给它。大多数情况下,您会发现该处理程序被创建为匿名内部类

    从这个AsynchronousServerSocketChannel 对象中,您调用accept() 来告诉它开始侦听连接,并将自定义CompletionHandler 实例传递给它。当我们调用accept() 时,它会立即返回。请注意,这与传统的阻塞方法不同; accept() 方法在客户端连接到它之前会阻塞,而 AsynchronousServerSocketChannel accept() 方法会为您处理。

    这里有一个例子:

    public class NioSocketServer
    {
        public NioSocketServer()
        {
            try {
                // Create an AsynchronousServerSocketChannel that will listen on port 5000
                final AsynchronousServerSocketChannel listener = AsynchronousServerSocketChannel
                        .open()
                        .bind(new InetSocketAddress(5000));
    
                // Listen for a new request
                listener.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>()
                {
                    @Override
                    public void completed(AsynchronousSocketChannel ch, Void att)
                    {
                        // Accept the next connection
                        listener.accept(null, this);
    
                        // Greet the client
                        ch.write(ByteBuffer.wrap("Hello, I am Echo Server 2020, let's have an engaging conversation!\n".getBytes()));
    
                        // Allocate a byte buffer (4K) to read from the client
                        ByteBuffer byteBuffer = ByteBuffer.allocate(4096);
                        try {
                            // Read the first line
                            int bytesRead = ch.read(byteBuffer).get(20, TimeUnit.SECONDS);
    
                            boolean running = true;
                            while (bytesRead != -1 && running) {
                                System.out.println("bytes read: " + bytesRead);
    
                                // Make sure that we have data to read
                                if (byteBuffer.position() > 2) {
                                    // Make the buffer ready to read
                                    byteBuffer.flip();
    
                                    // Convert the buffer into a line
                                    byte[] lineBytes = new byte[bytesRead];
                                    byteBuffer.get(lineBytes, 0, bytesRead);
                                    String line = new String(lineBytes);
    
                                    // Debug
                                    System.out.println("Message: " + line);
    
                                    // Echo back to the caller
                                    ch.write(ByteBuffer.wrap(line.getBytes()));
    
                                    // Make the buffer ready to write
                                    byteBuffer.clear();
    
                                    // Read the next line
                                    bytesRead = ch.read(byteBuffer).get(20, TimeUnit.SECONDS);
                                } else {
                                    // An empty line signifies the end of the conversation in our protocol
                                    running = false;
                                }
                            }
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        } catch (ExecutionException e) {
                            e.printStackTrace();
                        } catch (TimeoutException e) {
                            // The user exceeded the 20 second timeout, so close the connection
                            ch.write(ByteBuffer.wrap("Good Bye\n".getBytes()));
                            System.out.println("Connection timed out, closing connection");
                        }
    
                        System.out.println("End of conversation");
                        try {
                            // Close the connection if we need to
                            if (ch.isOpen()) {
                                ch.close();
                            }
                        } catch (I/OException e1)
                        {
                            e1.printStackTrace();
                        }
                    }
    
                    @Override
                    public void failed(Throwable exc, Void att)
                    {
                        ///...
                    }
                });
            } catch (I/OException e) {
                e.printStackTrace();
            }
        }
    
        public static void main(String[] args)
        {
            NioSocketServer server = new NioSocketServer();
            try {
                Thread.sleep(60000);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    

    你可以找到the full code here

    【讨论】:

    • 哈!从来不知道异步版本。生活变得越来越简单。
    • 这里有很多错误和混乱。您说选择器创建事件键。它没有。它只会更新它们。 SelectionKeysSelectableChannel.register() 创建。 “通常在服务器端创建可接受的密钥”应​​为“始终”。 “首先我们用 ServerSocketChannel.open() 方法创建一个 SocketChannel 的实例”是不正确的。 bind() 不创建连接。 Selector.open() 不执行注册。监听套接字不是“面向流的”。异步代码忽略 write() 调用返回的 Future
    • 第二个为每个传入连接创建一个线程。刚刚通过将System.out.println("New connection added to " + Thread.currentThread().getName()); 添加到完成的方法来测试它。
    • 并且accept() 不会告诉频道开始监听连接。它已经在这样做了。
    • @Teocci 如何使用异步处理程序来实现对 Modbus 的调用?喜欢这里:Modbus Pulse Coils.
    【解决方案2】:

    在 Java 中实现非阻塞套接字的最佳方式是什么?

    只有一种方法。 SocketChannel.configureBlocking(false).

    请注意,其中一些答案是不正确的。 SocketChannel.configureBlocking(false) 将其置于非阻塞模式。你不需要Selector 来做到这一点。您只需要一个Selector 来实现超时或多路复用 I/O 与非阻塞套接字。

    【讨论】:

      【解决方案3】:

      除了使用非阻塞 IO 之外,您可能会发现为连接设置一个写入线程要简单得多。

      注意:如果你只需要几千个连接,每个连接一到两个线程会更简单。如果每台服务器有大约一万个或更多连接,则需要 NIO 和选择器。

      【讨论】:

      • 更简单,但不可扩展。由于程序消耗资源的开销,太多的线程和你的电脑不再喜欢你了。但这一切都取决于您的个人需求。
      • @JasperLankhorst 是的,您可以拥有的连接数限制为几千个,每个连接有 1/2 个线程。使用选择器,您可以拥有 10 倍或更多。
      【解决方案4】:

      java.nio 包提供了 Selector 就像在 C 中一样工作。

      【讨论】:

        【解决方案5】:

        我刚刚写了这段代码。它运作良好。这是上面答案中提到的 Java NIO 的一个示例,但我在这里发布了代码。

        ServerSocketChannel ssc = null;
        try {
            ssc = ServerSocketChannel.open();
            ssc.socket().bind(new InetSocketAddress(port));
            ssc.configureBlocking(false);
            while (true) {
                SocketChannel sc = ssc.accept();
                if (sc == null) {
                    // No connections came .
                } else {
                    // You got a connection. Do something
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        

        【讨论】:

        • 无法使用。当没有连接时,此代码将自旋循环。一个健全的版本将使用阻塞模式或Selector.
        • 我认为这种解决方案与阻塞方法非常相似,使用非阻塞方法的想法是创建一个处理程序来在事件发生时处理接受。当你使用这个 while (true) 来获取套接字时,就像你在阻塞线程直到客户端连接发生。
        • 这将通过阻塞服务器套接字通道得到改善,并且您仍然可以拥有非阻塞连接。
        • @Teocci 这就像抽 CPU 而不是使用阻塞操作。这确实是非常糟糕的做法。
        猜你喜欢
        • 1970-01-01
        • 2010-10-31
        • 2013-10-15
        • 2013-06-03
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2017-04-18
        • 2020-02-18
        相关资源
        最近更新 更多