【问题标题】:Exception using AsynchronousSocketChannel, ByteBuffer and Future使用 AsynchronousSocketChannel、ByteBuffer 和 Future 的异常
【发布时间】:2015-10-23 10:54:16
【问题描述】:

有一个任务:使用AsynchronousSocketChannel 进行函数的多线程执行。为了模拟服务器端的长时间工作,我使用了Thread.sleep()(存储在Worker 类中)。如果我sleep()线程超过2秒,当使用ByteBufferFuture在客户端获取数据时,苍蝇java.lang.IndexOutOfBoundsException(我的函数printFuncResult)请告诉我,这是什么问题?

服务器:

public class Server {
public static final InetSocketAddress hostAddress = new InetSocketAddress("localhost", 5678);

private AsynchronousServerSocketChannel serverChannel;
private AsynchronousSocketChannel clientChannel;
ExecutorService threadPool;

public Server() throws IOException, ExecutionException, InterruptedException {
    serverChannel = AsynchronousServerSocketChannel.open();
    serverChannel.bind(hostAddress);
    System.out.println("Server channel bound to port: " + hostAddress.getPort());
    System.out.println("Waiting for client to connect... ");
    getClientChannel();
    handleArguments();
}

private void getClientChannel() throws ExecutionException, InterruptedException {
    Future<AsynchronousSocketChannel> acceptResult = serverChannel.accept();
    clientChannel = acceptResult.get();
}

private void handleArguments() throws IOException, InterruptedException {
    if ((clientChannel != null) && (clientChannel.isOpen())) {
        ByteBuffer buffer = ByteBuffer.allocate(32);
        Future<Integer> result = clientChannel.read(buffer);
        while (! result.isDone()) {
          //  System.out.println("Result coming... ");
        }
        buffer.flip();
        int x = buffer.getInt(0);

        Worker workerOne = new Worker(Worker.TYPE_F, x, clientChannel);
        Worker workerTwo = new Worker(Worker.TYPE_G, x, clientChannel);

        threadPool = Executors.newFixedThreadPool(2);
        threadPool.execute(workerOne);
        threadPool.execute(workerTwo);


        Thread.sleep(3000);
        clientChannel.close();
    }
}
}

class Worker implements Runnable {
public static final int TYPE_F = 1;
public static final int TYPE_G = 2;
private int x;
private int type;
private AsynchronousSocketChannel clientChannel;

public Worker(int type, int x, AsynchronousSocketChannel clientChannel) {
    this.x = x;
    this.type = type;
    this.clientChannel = clientChannel;
}

private void sendResultToClient(int res) {
    ByteBuffer buffer = ByteBuffer.allocate(32);
    if (type == TYPE_F) {
        res = 4545;
    } else {
        res = 34234;
    }
    buffer.putInt(0, type);
    buffer.putInt(4, res);
    try {
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    for (int i = 0; i < 10000; ++i) {
        for (int j = 0; j < 10000; ++j) {
            int k = i*j + i/(j +1) + i + j + Math.max(i, j);
        }
    }
    boolean written = false;
    while (!written) {
        try {
            clientChannel.write(buffer);
            written = true;
        } catch (Exception e) {}
    }
}

@Override
public void run() {
    int result = -1;
    Random random = new Random();
    switch (type) {
        case TYPE_F :
            result = (int)Math.pow(x, x);
            break;
        case TYPE_G :
            result = (int)Math.pow(x, x / 2);
            break;
    }
    sendResultToClient(result);
}
}

客户:

public class Client {
private AsynchronousSocketChannel clientChannel;
ExecutorService threadPool;

public Client(int x) throws IOException, InterruptedException, ExecutionException {
    threadPool = Executors.newFixedThreadPool(2);
    boolean connected = false;
    while (!connected) {
        try {
            clientChannel = AsynchronousSocketChannel.open();
            Future<Void> future = clientChannel.connect(Server.hostAddress);
            future.get();
            connected = true;
        } catch (ExecutionException e) {}
    }
    System.out.println("Client is started: " + clientChannel.isOpen());
    System.out.println("Sending messages to server: ");
    sendArguments(x);
    //showCancelDialog();
    listenResult();
    clientChannel.close();
}



private void sendArguments(int x) throws InterruptedException, IOException {
    ByteBuffer buffer = ByteBuffer.allocate(32);
    buffer.putInt(0, x);
    Future<Integer> result = clientChannel.write(buffer);
    while (! result.isDone()) {
        System.out.println("... ");
    }
}

private boolean waitForResult(Future <Pair <Integer, Integer>> futureResult) {
    Scanner sc = new Scanner(System.in);
    while (!futureResult.isDone() ) {
        System.out.println("DOING.. break? (y/n)");
        String input = sc.next();
        if (input.equals("y")) {
            System.out.println("CANCELLED");
            threadPool.shutdownNow();
            return false;
        }
    }
    return true;
}

private void printFuncResult(Future <Pair <Integer, Integer>> futureResult) throws ExecutionException, InterruptedException {
    Integer funcType = new Integer(futureResult.get().getKey());
    Integer result = new Integer(futureResult.get().getValue());

    System.out.println("RESULT OF "+ funcType +" FUNC = " + result);
}

private void listenResult() throws ExecutionException, InterruptedException {

    System.out.println("Wating for result...");

    Listener listener = new Listener(clientChannel);

    Future <Pair <Integer, Integer>> futureResult = threadPool.submit(listener);
    if (!waitForResult(futureResult)) {
        return;
    }

    printFuncResult(futureResult);

    futureResult = threadPool.submit(listener);

    if (!waitForResult(futureResult)) {
        return;
    }

    printFuncResult(futureResult);

}
}

class Listener implements Callable <Pair <Integer, Integer>> {
private AsynchronousSocketChannel clientChannel;

public Listener(AsynchronousSocketChannel channel) {
    this.clientChannel = channel;
}

@Override
public Pair <Integer, Integer> call() throws Exception {
    ByteBuffer buffer = ByteBuffer.allocate(32);
    Future<Integer> futureResult = clientChannel.read(buffer);
    while (! futureResult.isDone()) {}
    buffer.flip();
    Integer type = new Integer(buffer.getInt(0));
    Integer result = new Integer (buffer.getInt(4));
    return new Pair<Integer, Integer>(type, result);
}
}
}

【问题讨论】:

  • @willshackleford bytes_read = -1 如果也重试 -1。所以它没有帮助。我认为服务器上 clientChannel.write() 的问题。
  • 请跟踪堆栈。

标签: java multithreading exception asynchronous


【解决方案1】:

IndexOutOfBoundsException 不是来自 printFuncResult。它只存储在将来并与堆栈跟踪一起打印。 IndexOutOfBoundsException 是在此行的 Listener 调用函数中生成的:

Integer type = new Integer(buffer.getInt(0));

如果读取没有读取足够数量的字节,就会发生这种情况。

我建议你把这个低效且难以调试的while循环替换掉。

while (! futureResult.isDone()) {}

类似

int bytes_read = futureResult.get();
if(bytes_read != 32) {
   // log error or throw exception or retry ...
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-08-17
    • 2020-06-14
    • 2016-05-05
    • 1970-01-01
    • 2020-07-30
    • 2018-06-15
    相关资源
    最近更新 更多