【问题标题】:how to serve 1000s of concurrent connection using Java NIO如何使用 Java NIO 提供 1000 个并发连接
【发布时间】:2017-05-15 07:12:41
【问题描述】:

我的发件人每秒发送 10000 个请求(甚至更多),但我的 ServerSocketChannel 只能读取和处理(在线程中)8000 个请求 (~appx)。

虚拟代码是这样的:

public class NioReceiver {
    private int port = -1;
    private static String message = null;
    public void receive() throws IOException {
        // Get the selector
        Selector selector = Selector.open();
        // Selector is open for making connection
        // Get the server socket channel and register using selector
        ServerSocketChannel SS = ServerSocketChannel.open();
        InetSocketAddress hostAddress = new InetSocketAddress(this.port);
        SS.bind(hostAddress);
        SS.configureBlocking(false);
        int ops = SS.validOps();
        SelectionKey selectKy = SS.register(selector, ops, null);
        for (;;) {
            //Waiting for the select operation...
            int noOfKeys = selector.select();
            // The Number of selected keys are: noOfKeys
            Set selectedKeys = selector.selectedKeys();
            Iterator itr = selectedKeys.iterator();
            while (itr.hasNext()) {
                ByteBuffer buffer = ByteBuffer.allocate(1024 * 60);
                SelectionKey ky = (SelectionKey) itr.next();
                if (ky.isAcceptable()) {
                    // The new client connection is accepted
                    SocketChannel client = SS.accept();
                    client.configureBlocking(false);
                    // The new connection is added to a selector
                    client.register(selector, SelectionKey.OP_READ);
                    // The new connection is accepted from the client: client
                } else if (ky.isReadable()) {
                    // Data is read from the client
                    SocketChannel client = (SocketChannel) ky.channel();
                    String output = null;
                    buffer.clear();
                    int charRead = -1;
                    try {
                        charRead = client.read(buffer);
                    } catch (IOException e) {
                        continue;
                    }
                    if (charRead <= 0) {
                        // client closed
                        client.close();
                    } else {
                        output = new String(buffer.array());
                        message = output;
                        try {
                            new Thread(() -> {
                                processAndStore(message);
                            }).start();
                        } catch (Exception e) {
                            System.err.println("Thread exception:::" + e.getMessage());
                        }
                    } // else if of client.isConnected()
                } // else if of ky.isReadable()
                itr.remove();
            } // end of while loop
        } // end of for loop
    }

    public void processAndStore(String output) {
        String exchangeName = null;
        String dataLine = null;
        String Lines[] = output.split("\r\n");
        for (int i = 0; i < Lines.length; i++) {
            if (Lines[i].contains("Host: ")) {
                exchangeName = Lines[i].substring(6);
            }
            if (Lines[i].isEmpty()) {
                dataLine = Lines[i + 1];
            }
        }
        StringBuffer updatedLastLine = null;
        if (dataLine != null) {
            if (dataLine.contains("POST")) {
                updatedLastLine = new StringBuffer(dataLine.substring(0, dataLine.indexOf("POST")));
            } else {
                updatedLastLine = new StringBuffer(dataLine);
            }
            if (!dataLine.equals("")) {
                try {
                    if (updatedLastLine.lastIndexOf("}") != -1) {
                        updatedLastLine.replace(updatedLastLine.lastIndexOf("}"), updatedLastLine.lastIndexOf("}") + 1, ",\"name\":\"" + exchangeName
                                + "\"}");
                    } else {

                        return;
                    }
                } catch (StringIndexOutOfBoundsException e) {
                    System.out.println(updatedLastLine + "::" + dataLine);
                    System.out.println(e);
                }
                store(updatedLastLine.toString());
            }
        }
    }

    public NioReceiver(int port) {
        this.port = port;
    }
}

当我删除处理逻辑时,它能够接收更多请求,但不是全部。

如何改进我的代码以接收所有 10000 个传入请求。

【问题讨论】:

  • 买一台速度更快的电脑?向外扩展? --- 使用线程池/消息队列而不是创建 1000 个线程来调用 processAndStore()。启动一个线程是昂贵的。每秒启动 10000 个线程? Yikes! --- Profile code 看看瓶颈在哪里,而不是猜测,但这里有一些猜测:1)不要使用StringBuffer,使用@ 987654324@。 2)不要给lastIndexOf("}")打三遍。
  • NIO 的目的是减少所需线程的数量。你似乎没有收到消息。不清楚你在问什么。
  • 好的,谢谢您的及时答复。与此同时,我所做的是我删除了线程的创建,并且性能有所提高。此外,我跳过NIO 并使用基于netty 的接收器和SimpleChannelInboundHandler,并且在相同的硬件配置下,我能够接收几乎所有的请求。

标签: java multithreading sockets nio channel


【解决方案1】:

使用线程池/消息队列而不是创建 1000 个线程来调用 processAndStore()

启动线程昂贵

开始每秒 10000 个线程? 哎呀!

正如@EJP 在comment 中所说:

NIO 的目的是减少所需线程的数量。您似乎没有收到消息。


除此之外,profile您的代码以查看瓶颈在哪里,而不是猜测。

但是,无论如何,这里有一些猜测:

  1. 不要使用StringBuffer,使用StringBuilder
    原因:参见Difference between StringBuilder and StringBuffer

  2. 不要调用lastIndexOf("}") 三次。
    原因: lastIndexOf() 是顺序搜索,所以比较慢。 JVM 可能会或可能不会优化多个调用,但如果性能至关重要,请不要依赖它。通过将结果分配给变量来自己做。另见Does Java optimize method calls via an interface which has a single implementor marked as final?

【讨论】:

  • 是的。完美的!谢谢老兄。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2014-01-12
  • 1970-01-01
  • 1970-01-01
  • 2015-01-09
  • 1970-01-01
  • 2011-06-12
相关资源
最近更新 更多