【发布时间】:2019-10-17 06:16:54
【问题描述】:
我将AsyncHttpClient library 用于异步非阻塞请求。 我的情况:通过网络接收数据时将数据写入文件。
对于从远程主机下载文件并保存到文件,我使用默认的ResponseBodyPartFactory.EAGER 和AsynchronousFileChannel,以免在数据到达时阻塞 netty 线程。但正如我的测量结果所示,与LAZY 相比,Java 堆中的内存消耗增加了很多倍。
所以我决定直接去LAZY,但没有考虑文件的后果。
此代码将有助于重现问题。:
public static class AsyncChannelWriter {
private final CompletableFuture<Integer> startPosition;
private final AsynchronousFileChannel channel;
public AsyncChannelWriter(AsynchronousFileChannel channel) throws IOException {
this.channel = channel;
this.startPosition = CompletableFuture.completedFuture((int) channel.size());
}
public CompletableFuture<Integer> getStartPosition() {
return startPosition;
}
public CompletableFuture<Integer> write(ByteBuffer byteBuffer, CompletableFuture<Integer> currentPosition) {
return currentPosition.thenCompose(position -> {
CompletableFuture<Integer> writenBytes = new CompletableFuture<>();
channel.write(byteBuffer, position, null, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
writenBytes.complete(result);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
writenBytes.completeExceptionally(exc);
}
});
return writenBytes.thenApply(writenBytesLength -> writenBytesLength + position);
});
}
public void close(CompletableFuture<Integer> currentPosition) {
currentPosition.whenComplete((position, throwable) -> IOUtils.closeQuietly(channel));
}
}
public static void main(String[] args) throws IOException {
final String filepath = "/media/veracrypt4/files/1.jpg";
final String downloadUrl = "https://m0.cl/t/butterfly-3000.jpg";
final AsyncHttpClient client = Dsl.asyncHttpClient(Dsl.config().setFollowRedirect(true)
.setResponseBodyPartFactory(AsyncHttpClientConfig.ResponseBodyPartFactory.LAZY));
final AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get(filepath), StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.CREATE);
final AsyncChannelWriter asyncChannelWriter = new AsyncChannelWriter(channel);
final AtomicReference<CompletableFuture<Integer>> atomicReferencePosition = new AtomicReference<>(asyncChannelWriter.getStartPosition());
client.prepareGet(downloadUrl)
.execute(new AsyncCompletionHandler<Response>() {
@Override
public State onBodyPartReceived(HttpResponseBodyPart content) throws Exception {
//if EAGER, content.getBodyByteBuffer() return HeapByteBuffer, if LAZY, return DirectByteBuffer
final ByteBuffer bodyByteBuffer = content.getBodyByteBuffer();
final CompletableFuture<Integer> currentPosition = atomicReferencePosition.get();
final CompletableFuture<Integer> newPosition = asyncChannelWriter.write(bodyByteBuffer, currentPosition);
atomicReferencePosition.set(newPosition);
return State.CONTINUE;
}
@Override
public Response onCompleted(Response response) {
asyncChannelWriter.close(atomicReferencePosition.get());
return response;
}
});
}
在这种情况下,图片已损坏。但是如果我使用FileChannel 而不是AsynchronousFileChannel,在这两种情况下,文件都会正常显示。使用DirectByteBuffer(如果使用LazyResponseBodyPart.getBodyByteBuffer())和AsynchronousFileChannel 时是否有任何细微差别?
如果EAGER 一切正常,我的代码可能有什么问题?
更新
我注意到,如果我使用LAZY,例如,我添加了行
Thread.sleep (10)在方法onBodyPartReceived中,像这样:
@Override
public State onBodyPartReceived(HttpResponseBodyPart content) throws Exception {
final ByteBuffer bodyByteBuffer = content.getBodyByteBuffer();
final CompletableFuture<Integer> currentPosition = atomicReferencePosition.get();
final CompletableFuture<Integer> newPosition = finalAsyncChannelWriter.write(bodyByteBuffer, currentPosition);
atomicReferencePosition.set(newPosition);
Thread.sleep(10);
return State.CONTINUE;
}
文件以非损坏状态保存到磁盘。
据我了解,原因是在这 10 毫秒内,AsynchronousFileChannel 中的异步线程设法从这个DirectByteBuffer 向磁盘写入数据。
结果文件被破坏是因为这个异步线程使用这个缓冲区与netty线程一起写入。
如果我们用EagerResponseBodyPart查看源代码,那么我们将看到以下内容
private final byte[] bytes;
public EagerResponseBodyPart(ByteBuf buf, boolean last) {
super(last);
bytes = byteBuf2Bytes(buf);
}
@Override
public ByteBuffer getBodyByteBuffer() {
return ByteBuffer.wrap(bytes);
}
因此,当一条数据到达时,它立即存储在字节数组中。然后我们可以安全地将它们包装在 HeapByteBuffer 中,并在文件通道中传输到异步线程。
但是如果你看代码LazyResponseBodyPart
private final ByteBuf buf;
public LazyResponseBodyPart(ByteBuf buf, boolean last) {
super(last);
this.buf = buf;
}
@Override
public ByteBuffer getBodyByteBuffer() {
return buf.nioBuffer();
}
如你所见,我们实际上通过方法调用nioBuffer在异步文件通道线程nettyByteBuff(在这种情况下总是PooledSlicedByteBuf)使用@
在这种情况下我该怎么办,如何在异步线程中安全地传递 DirectByteBuffer 而无需将缓冲区复制到 Java 堆?
【问题讨论】:
-
为什么不使用
BodyDeferringAsyncHandler让生活更简单? -
@MạnhQuyếtNguyễn 因为它无效?我使用这个客户端来减少内存消耗和 CPU 资源。对于简单的生活,我可以使用 apache 同步客户端。顺便说一句,
BodyDeferringAsyncHandler在内存消耗方面与我使用EAGER的示例没有什么不同,因为BodyDeferringAsyncHandler使用getBodyPartBytes方法。我不确定,但可能在使用BodyDeferringAsyncHandler时,线程会在写入OutputStream时阻塞。 -
仅供参考:调用
client.prepareGet(downloadUrl).execute的线程未被阻塞。保持简单 -
@MạnhQuyếtNguyễn 当然可以,但是处理数据的线程会被阻塞。
-
总有一个线程被阻塞:真正写数据的那个
标签: java asynchronous nio asynchttpclient nio2