【发布时间】:2018-07-18 01:08:19
【问题描述】:
我在 netty 中看到了很多关于分块流的问题,但其中大多数是关于出站流的解决方案,而不是入站流。
我想了解如何从通道获取数据并将其作为 InputStream 发送到我的业务逻辑,而无需先将所有数据加载到内存中。
这就是我想要做的:
public class ServerRequestHandler extends MessageToMessageDecoder<HttpObject> {
private HttpServletRequest request;
private PipedOutputStream os;
private PipedInputStream is;
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
super.handlerAdded(ctx);
this.os = new PipedOutputStream();
this.is = new PipedInputStream(os);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
super.handlerRemoved(ctx);
this.os.close();
this.is.close();
}
@Override
protected void decode(ChannelHandlerContext ctx, HttpObject msg, List<Object> out)
throws Exception {
if (msg instanceof HttpRequest) {
this.request = new CustomHttpRequest((HttpRequest) msg, this.is);
out.add(this.request);
}
if (msg instanceof HttpContent) {
ByteBuf body = ((HttpContent) msg).content();
if (body.readableBytes() > 0)
body.readBytes(os, body.readableBytes());
if (msg instanceof LastHttpContent) {
os.close();
}
}
}
}
然后我有另一个处理程序,它将获取我的 CustomHttpRequest 并发送到我称之为 ServiceHandler 的东西,我的业务逻辑将从 InputStream 中读取。
public class ServiceRouterHandler extends SimpleChannelInboundHandler<CustomHttpRequest> {
...
@Override
public void channelRead0(ChannelHandlerContext ctx, CustomHttpRequest request) throws IOException {
...
future = serviceHandler.handle(request, response);
...
这不起作用,因为当我的 Handler 将 CustomHttpRequest 转发给 ServiceHandler,并尝试从 InputStream 中读取时,线程被阻塞,并且我的解码器中永远不会处理 HttpContent。
我知道我可以尝试为我的业务逻辑创建一个单独的线程,但我觉得我在这里过于复杂了。
我查看了 ByteBufInputStream,但它说
请注意,它最多只能读取可读字节数 在施工时确定。
所以我认为它不适用于 Chunked Http 请求。另外,我看到了 ChunkedWriteHandler,这对于 Oubound 块来说似乎很好,但我找不到 ChunkedReadHandler 之类的东西......
所以我的问题是:最好的方法是什么?我的要求是:
- 在发送 ServiceHandlers 之前不要将数据保存在内存中;
- ServiceHandlers API 应该与 netty 无关(这就是为什么我使用我的 CustomHttpRequest,而不是 Netty 的 HttpRequest);
更新
我已经在 CustomHttpRequest 上使用更具反应性的方法来实现这一点。现在,请求没有向 ServiceHandlers 提供 InputStream 以便它们可以读取(这是阻塞的),但是 CustomHttpRequest 现在有一个返回 Future 的readInto(OutputStream) 方法,并且所有服务处理程序都将在此时执行输出流已满。这是它的样子
public class CustomHttpRequest {
...constructors and other methods hidden...
private final SettableFuture<Void> writeCompleteFuture = SettableFuture.create();
private final SettableFuture<OutputStream> outputStreamFuture = SettableFuture.create();
private ListenableFuture<Void> lastWriteFuture = Futures.transform(outputStreamFuture, x-> null);
public ListenableFuture<Void> readInto(OutputStream os) throws IOException {
outputStreamFuture.set(os);
return this.writeCompleteFuture;
}
ListenableFuture<Void> writeChunk(byte[] buf) {
this.lastWriteFuture = Futures.transform(lastWriteFuture, (AsyncFunction<Void, Void>) (os) -> {
outputStreamFuture.get().write(buf);
return Futures.immediateFuture(null);
});
return lastWriteFuture;
}
void complete() {
ListenableFuture<Void> future =
Futures.transform(lastWriteFuture, (AsyncFunction<Void, Void>) x -> {
outputStreamFuture.get().close();
return Futures.immediateFuture(null);
});
addFinallyCallback(future, () -> {
this.writeCompleteFuture.set(null);
});
}
}
我更新后的 ServletRequestHandler 看起来像这样:
public class ServerRequestHandler extends MessageToMessageDecoder<HttpObject> {
private NettyHttpServletRequestAdaptor request;
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
super.handlerAdded(ctx);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
super.handlerRemoved(ctx);
}
@Override
protected void decode(ChannelHandlerContext ctx, HttpObject msg, List<Object> out)
throws Exception {
if (msg instanceof HttpRequest) {
HttpRequest request = (HttpRequest) msg;
this.request = new CustomHttpRequest(request, ctx.channel());
out.add(this.request);
}
if (msg instanceof HttpContent) {
ByteBuf buf = ((HttpContent) msg).content();
byte[] bytes = new byte[buf.readableBytes()];
buf.readBytes(bytes);
this.request.writeChunk(bytes);
if (msg instanceof LastHttpContent) {
this.request.complete();
}
}
}
}
这工作得很好,但请注意,这里的一切都是在一个线程中完成的,也许对于大数据,我可能想要生成一个新线程来释放该线程以用于其他通道。
【问题讨论】:
-
常规 InputStream 或 OutputStream 不起作用吗? PipedInput/OutputStream 在文档中说“不建议尝试使用单个线程中的两个对象,因为它可能会使线程死锁”。
-
好吧,你对死锁的看法是对的……但就目前而言,我的解决方案意味着我的 ServiceHandler 需要另一个线程,在这种情况下,它们将用于不同的线程。但是我的ServiceHandler上需要一个InputStream,而我相信我需要以某种方式在我的netty处理程序上写入这个InputStream,而我发现这样做的方法是使用Piped ...其他方法是使用ByteArrayInputStream,但我d 有内存问题或 ByteBuffInputStream,对此我有问题中提到的问题。还有其他想法吗?
-
我会说试试 BufferedInput/OutputStreams。
-
我也担心ServerRequestHandler。除非根据请求创建此类,否则一旦收到多个请求,此操作就会失败。
-
@aglassman 我看不出 BuffereInputStream 在这里有什么帮助。
标签: java multithreading stream netty inbound