【问题标题】:AsynchronousSocketChannel Read/WritePendingException - possible to synchronize?AsynchronousSocketChannel Read/WritePendingException - 可以同步吗?
【发布时间】:2020-08-22 04:30:41
【问题描述】:

我正在做一个 TCP 服务器,我很好奇是否可以同步 AsynchronousSocketChannel 的读写方法。我将频道包装到另一个类中,因为我的频道需要一些额外的功能。我的问题是这是否真的是同步这个的正确方法:

/**
 * writes bytes from a <b>ByteBuffer</b> into an
 * <b>AsynchronousSocketChannel</b>
 * 
 * @param buffer    the ByteBuffer to write from
 * @param onFailure specifies the method that should be called on failure of the
 *                  write operation
 */
public void write(ByteBuffer buffer, final C onFailure) {

    CompletionHandler<Integer, ByteBuffer> handler = new CompletionHandler<Integer, ByteBuffer>() {

        @Override
        public void completed(Integer result, ByteBuffer buf) {
            if (buf.hasRemaining())
                channel.write(buf, buf, this);
        }

        @Override
        public void failed(Throwable exc, ByteBuffer buf) {
            attachment.call(onFailure, exc);
        }

    };

    synchronized (writeLock) {
        this.channel.write(buffer, buffer, handler);
    }
}

在这种情况下,writeLock 是一个 static final 对象,当我的包装类的任意实例开始写入操作时,它会获取锁。这真的有效还是只是用完了同步块?

【问题讨论】:

  • 写入是异步的,所以它只是掉出synchronized 块,因此是徒劳的。在完成处理程序运行之前,您无法安排更多的写入。听起来你需要一个写队列。不过,您不需要同步读取与写入,您可以同时安排它们。
  • 谢谢,我确实能够通过写入队列和队列上的一些同步来修复它:)

标签: java sockets asynchronous io channel


【解决方案1】:

这就是我修复它的方法:

/**
 * writes bytes from a <b>ByteBuffer</b> into an
 * <b>AsynchronousSocketChannel</b>
 * 
 * @param buffer    the ByteBuffer to write from
 * @param onFailure specifies the method that should be called on failure of the
 *                  write operation
 */
public void write(ByteBuffer buffer, final C onFailure) {

    CompletionHandler<Integer, ByteBuffer> handler = new CompletionHandler<Integer, ByteBuffer>() {

        @Override
        public void completed(Integer result, ByteBuffer buf) {
            if (buf.hasRemaining()) {
                channel.write(buf, buf, this);
                return;
            }

            synchronized (writeLock) {
                if (!writeQueue.isEmpty()) {
                    while (writePending)
                        ;

                    ByteBuffer writeBuf = writeQueue.pop();
                    channel.write(writeBuf, writeBuf, this);
                    writePending = true;
                    return;
                }
            }

            writePending = false;
        }

        @Override
        public void failed(Throwable exc, ByteBuffer buf) {
            writePending = false;
            attachment.call(onFailure, exc);
        }

    };

    synchronized (writeLock) {
        while (this.writePending)
            ;

        this.writeQueue.push(buffer);

        ByteBuffer writeBuffer = this.writeQueue.pop();
        this.channel.write(writeBuffer, writeBuffer, handler);
        this.writePending = true;
    }
}

【讨论】:

    猜你喜欢
    • 2015-07-29
    • 2014-05-06
    • 1970-01-01
    • 1970-01-01
    • 2011-01-26
    • 1970-01-01
    • 2023-03-16
    • 2021-08-03
    • 2011-09-29
    相关资源
    最近更新 更多