【问题标题】:How to make a Java NIO (Non blocking IO) based TCP server using Disruptor?如何使用 Disruptor 制作基于 Java NIO(非阻塞 IO)的 TCP 服务器?
【发布时间】:2019-04-24 10:01:59
【问题描述】:

我正在尝试使用 Disruptor 实现一个基于 JAVA NIO 的 TCP 服务器。

Java NIO 以非阻塞方式工作。所有新连接首先命中 ServerAccept Socket。然后使用键(从 selector.select() 返回)方法,适当的处理程序(如果键是可接受的,则创建一个新的套接字通道,并且通道被注册到选择器,如果键是可读的,则内容从通道读取,然后注册写入,如果密钥是可写的,则通道被写入响应应该有的任何内容)被调用。最简单的基于 NIO 的服务器在单个线程中工作(所有处理程序和选择器在同一个线程中)。

Java Disruptor 是一种高性能的 Ring 实现,可用于在不同组件(线程)之间传递消息。

我的问题如下。

  1. 我们可以在 NIO 设计中使用多线程吗?

  2. 我们可以在单独的线程中运行 eventHandlers 吗?

  3. 如果我们可以在单独的线程中运行 eventHandlers,我们如何在线程之间传递 selectionKeys 和通道?

  4. java Disruptor 库能否用于在主线程(运行选择器的线程)和 eventHandler 线程之间传输数据?

  5. 如果可能,设计方法是什么? (Disruptor 中 EventProducer、EventConsumer 和 RingBuffer 的行为是什么?)

【问题讨论】:

  • “不起作用”不是问题描述。您不需要注册 OP_WRITE 或接收可写入的密钥来写入,除非先前的写入返回零。没有UDP连接之类的东西。不清楚你在问什么。
  • @user207421 你是什么意思“你不需要注册 OP_WRITE ”?从通道读取内容后,必须在选择器中注册通道以进行写入。只有当通道准备好写入时,我们才能写入通道
  • 我的意思是频道不需要必须注册才能写作。你就写吧。只有当你得到零返回时,你才需要注册 OP_WRITE,当你得到它时,你重复写入,如果成功,你再次取消注册 OP_WRITE。

标签: java nio disruptor-pattern


【解决方案1】:

您可以使用任何线程消息传递方法创建基于 NIO 的服务器,其中中断器就是这样一种选择。

这里,您需要解决的问题是如何将工作共享给不同的线程(而不是在主线程本身处理请求)。

因此,您可以使用中断器作为消息传递方法,将从套接字连接获得的缓冲区传递给单独的线程。此外,您需要维护一个共享的并发 hashmap 来通知主线程(运行事件循环)响应是否准备好。下面是一个例子。

HttpEvent.java

import java.nio.ByteBuffer;

public class HttpEvent
{
    private ByteBuffer buffer;
    private String requestId;
    private int numRead;


    public ByteBuffer getBuffer() {
        return buffer;
    }

    public void setBuffer(ByteBuffer buffer) {
        this.buffer = buffer;
    }

    public String getRequestId() {
        return requestId;
    }

    public void setRequestId(String requestId) {
        this.requestId = requestId;
    }

    public int getNumRead() {
        return numRead;
    }

    public void setNumRead(int numRead) {
        this.numRead = numRead;
    }
}

HttpEventFactory.java

import com.lmax.disruptor.EventFactory;

public class HttpEventFactory implements EventFactory<HttpEvent>
{
    public HttpEvent newInstance()
    {
        return new HttpEvent();
    }
}

HttpEventHandler.java

import com.lmax.disruptor.EventHandler;

import java.nio.ByteBuffer;
import java.util.Dictionary;
import java.util.concurrent.ConcurrentHashMap;

public class HttpEventHandler implements EventHandler<HttpEvent>
{
    private int id;
    private ConcurrentHashMap concurrentHashMap;

    public HttpEventHandler(int id, ConcurrentHashMap concurrentHashMap){
        this.id = id;
        this.concurrentHashMap = concurrentHashMap;

    }

    public void onEvent(HttpEvent event, long sequence, boolean endOfBatch) throws Exception
    {
        if( sequence % Runtime.getRuntime().availableProcessors()==id){


            String requestId = event.getRequestId();
            ByteBuffer buffer = event.getBuffer();
            int numRead= event.getNumRead();

            ByteBuffer responseBuffer = handleRequest(buffer, numRead);


            this.concurrentHashMap.put(requestId, responseBuffer);

        }
    }

    private ByteBuffer handleRequest(ByteBuffer buffer, int numRead) throws Exception {

        buffer.flip();
        byte[] data = new byte[numRead];
        System.arraycopy(buffer.array(), 0, data, 0, numRead);
        String request = new String(data, "US-ASCII");
        request = request.split("\n")[0].trim();


        String response = serverRequest(request);

        buffer.clear();

        buffer.put(response.getBytes());
        return  buffer;
    }

    private String serverRequest(String request) throws Exception {
        String response = "Sample Response";
        if (request.startsWith("GET")) {

            // http request parsing and response generation should be done here.    


        return  response;
    }
}

HttpEventMain.java

import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import org.apache.commons.lang3.RandomStringUtils;

import java.io.IOException;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

public class HttpEventMain
{
    private InetAddress addr;
    private int port;
    private Selector selector;
    private HttpEventProducer producer ;
    private ConcurrentHashMap concurrentHashMapResponse;
    private ConcurrentHashMap concurrentHashMapKey;

    public HttpEventMain(InetAddress addr, int port) throws IOException {
        this.setAddr(addr);
        this.setPort(port);
        this.setConcurrentHashMapResponse(new ConcurrentHashMap<>());
        this.concurrentHashMapKey = new ConcurrentHashMap<>();
    }


    public static void main(String[] args) throws Exception
    {
        System.out.println("----- Running the server on machine with "+Runtime.getRuntime().availableProcessors()+" cores -----");

        HttpEventMain server = new HttpEventMain(null, 4333);



        HttpEventFactory factory = new HttpEventFactory();


        int bufferSize = 1024;


        Executor executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); // a thread pool to which we can assign tasks


        Disruptor<HttpEvent> disruptor = new Disruptor<HttpEvent>(factory, bufferSize, executor);

        HttpEventHandler [] handlers = new HttpEventHandler[Runtime.getRuntime().availableProcessors()];

        for(int i = 0; i<Runtime.getRuntime().availableProcessors();i++){
            handlers[i] = new HttpEventHandler(i, server.getConcurrentHashMapResponse());
        }


        disruptor.handleEventsWith(handlers);




        disruptor.start();


        RingBuffer<HttpEvent> ringBuffer = disruptor.getRingBuffer();

        server.setProducer(new HttpEventProducer(ringBuffer, server.getConcurrentHashMapResponse()));

        try {
            System.out.println("\n====================Server Details====================");
            System.out.println("Server Machine: "+ InetAddress.getLocalHost().getCanonicalHostName());
            System.out.println("Port number: " + server.getPort());

        } catch (UnknownHostException e1) {
            e1.printStackTrace();
        }

        try {

            server.start();

        } catch (IOException e) {
            System.err.println("Error occured in HttpEventMain:" + e.getMessage());
            System.exit(0);
        }



    }
    private void start() throws IOException {
        this.selector = Selector.open();
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);


        InetSocketAddress listenAddr = new InetSocketAddress(this.addr, this.port);
        serverChannel.socket().bind(listenAddr);
        serverChannel.register(this.selector, SelectionKey.OP_ACCEPT);

        System.out.println("Server ready. Ctrl-C to stop.");

        while (true) {

            this.selector.select();


            Iterator keys = this.selector.selectedKeys().iterator();
            while (keys.hasNext()) {
                SelectionKey key = (SelectionKey) keys.next();

                keys.remove();

                if (! key.isValid()) {
                    continue;
                }

                if (key.isAcceptable()) {
                    this.accept(key);
                }
                else if (key.isReadable()) {
                    this.read(key);
                }
                else if (key.isWritable()) {
                    this.write(key);
                }
            }
        }

    }

    private void accept(SelectionKey key) throws IOException {

        ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
        SocketChannel channel = serverChannel.accept();
        channel.configureBlocking(false);


        Socket socket = channel.socket();
        SocketAddress remoteAddr = socket.getRemoteSocketAddress();

        channel.register(this.selector, SelectionKey.OP_READ);
    }

    private void read(SelectionKey key) throws IOException {

        SocketChannel channel = (SocketChannel) key.channel();

        ByteBuffer buffer = ByteBuffer.allocate(8192);
        int numRead = -1;
        try {
            numRead = channel.read(buffer);
        }
        catch (IOException e) {
            e.printStackTrace();
        }

        if (numRead == -1) {

            Socket socket = channel.socket();
            SocketAddress remoteAddr = socket.getRemoteSocketAddress();
            channel.close();
            key.cancel();
            return;

        }
        String requestID = RandomStringUtils.random(15, true, true);

        while(concurrentHashMapKey.containsValue(requestID) || concurrentHashMapResponse.containsKey(requestID)){
            requestID = RandomStringUtils.random(15, true, true);
        }

        concurrentHashMapKey.put(key, requestID);

        this.producer.onData(requestID, buffer, numRead);

        channel.register(this.selector, SelectionKey.OP_WRITE, buffer);
    }

    private boolean responseReady(SelectionKey key){

        String requestId = concurrentHashMapKey.get(key).toString();
        String response = concurrentHashMapResponse.get(requestId).toString();

        if(response!="0"){
            concurrentHashMapKey.remove(key);
            concurrentHashMapResponse.remove(requestId);
            return true;
        }else{
            return false;
        }

    }

    private void write(SelectionKey key) throws IOException {

        if(responseReady(key)) {
            SocketChannel channel = (SocketChannel) key.channel();
            ByteBuffer inputBuffer = (ByteBuffer) key.attachment();
            inputBuffer.flip();
            channel.write(inputBuffer);
            channel.close();
            key.cancel();

        }

    }

    public HttpEventProducer getProducer() {
        return producer;
    }

    public void setProducer(HttpEventProducer producer) {
        this.producer = producer;
    }

    public ConcurrentHashMap getConcurrentHashMapResponse() {
        return concurrentHashMapResponse;
    }

    public void setConcurrentHashMapResponse(ConcurrentHashMap concurrentHashMapResponse) {
        this.concurrentHashMapResponse = concurrentHashMapResponse;
    }

    public InetAddress getAddr() {
        return addr;
    }

    public void setAddr(InetAddress addr) {
        this.addr = addr;
    }

    public int getPort() {
        return port;
    }

    public void setPort(int port) {
        this.port = port;
    }

    public Selector getSelector() {
        return selector;
    }

    public void setSelector(Selector selector) {
        this.selector = selector;
    }
}

HttpEventProducer.java

import com.lmax.disruptor.RingBuffer;

import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentHashMap;

public class HttpEventProducer
{
    private final RingBuffer<HttpEvent> ringBuffer;
    private final ConcurrentHashMap concurrentHashMap;

    public HttpEventProducer(RingBuffer<HttpEvent> ringBuffer, ConcurrentHashMap concurrentHashMap)
    {
        this.ringBuffer = ringBuffer;
        this.concurrentHashMap = concurrentHashMap;
    }

    public void onData(String requestId, ByteBuffer buffer, int numRead)
    {
        long sequence = ringBuffer.next();

        try
        {
            HttpEvent event = ringBuffer.get(sequence);
            event.setBuffer(buffer);
            event.setRequestId(requestId);
            event.setNumRead(numRead);
        }
        finally
        {
            concurrentHashMap.put(requestId, "0");
            ringBuffer.publish(sequence);


        }
    }
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2019-09-12
    • 1970-01-01
    • 1970-01-01
    • 2011-09-04
    • 1970-01-01
    • 2022-12-09
    • 2010-11-30
    • 1970-01-01
    相关资源
    最近更新 更多