【问题标题】:Netty chunked input streamNetty 分块输入流
【发布时间】:2018-07-18 01:08:19
【问题描述】:

我在 netty 中看到了很多关于分块流的问题,但其中大多数是关于出站流的解决方案,而不是入站流。

我想了解如何从通道获取数据并将其作为 InputStream 发送到我的业务逻辑,而无需先将所有数据加载到内存中。 这就是我想要做的:

public class ServerRequestHandler extends MessageToMessageDecoder<HttpObject> {

  private HttpServletRequest request;
  private PipedOutputStream os;
  private PipedInputStream is;

  @Override
  public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    super.handlerAdded(ctx);
    this.os = new PipedOutputStream();
    this.is = new PipedInputStream(os);
  }

  @Override
  public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    super.handlerRemoved(ctx);
    this.os.close();
    this.is.close();
  }

  @Override
  protected void decode(ChannelHandlerContext ctx, HttpObject msg, List<Object> out)
      throws Exception {
    if (msg instanceof HttpRequest) {
      this.request = new CustomHttpRequest((HttpRequest) msg, this.is);
      out.add(this.request);
    }
    if (msg instanceof HttpContent) {
      ByteBuf body = ((HttpContent) msg).content();

      if (body.readableBytes() > 0)
        body.readBytes(os, body.readableBytes());

      if (msg instanceof LastHttpContent) {
        os.close();
      }
    }

  }

}

然后我有另一个处理程序,它将获取我的 CustomHttpRequest 并发送到我称之为 ServiceHandler 的东西,我的业务逻辑将从 InputStream 中读取。

public class ServiceRouterHandler extends SimpleChannelInboundHandler<CustomHttpRequest> {
...
    @Override
    public void channelRead0(ChannelHandlerContext ctx, CustomHttpRequest request) throws IOException {
...
        future = serviceHandler.handle(request, response);
...

这不起作用,因为当我的 Handler 将 CustomHttpRequest 转发给 ServiceHandler,并尝试从 InputStream 中读取时,线程被阻塞,并且我的解码器中永远不会处理 HttpContent。

我知道我可以尝试为我的业务逻辑创建一个单独的线程,但我觉得我在这里过于复杂了。
我查看了 ByteBufInputStream,但它说

请注意,它最多只能读取可读字节数 在施工时确定。

所以我认为它不适用于 Chunked Http 请求。另外,我看到了 ChunkedWriteHandler,这对于 Oubound 块来说似乎很好,但我找不到 ChunkedReadHandler 之类的东西......

所以我的问题是:最好的方法是什么?我的要求是:

- 在发送 ServiceHandlers 之前不要将数据保存在内存中;
- ServiceHandlers API 应该与 netty 无关(这就是为什么我使用我的 CustomHttpRequest,而不是 Netty 的 HttpRequest);

更新 我已经在 CustomHttpRequest 上使用更具反应性的方法来实现这一点。现在,请求没有向 ServiceHandlers 提供 InputStream 以便它们可以读取(这是阻塞的),但是 CustomHttpRequest 现在有一个返回 Future 的readInto(OutputStream) 方法,并且所有服务处理程序都将在此时执行输出流已满。这是它的样子

public class CustomHttpRequest {
  ...constructors and other methods hidden...
  private final SettableFuture<Void> writeCompleteFuture = SettableFuture.create();

  private final SettableFuture<OutputStream> outputStreamFuture = SettableFuture.create();

  private ListenableFuture<Void> lastWriteFuture = Futures.transform(outputStreamFuture, x-> null);

  public ListenableFuture<Void> readInto(OutputStream os) throws IOException {
    outputStreamFuture.set(os);
    return this.writeCompleteFuture;
  }

  ListenableFuture<Void> writeChunk(byte[] buf) {
    this.lastWriteFuture = Futures.transform(lastWriteFuture, (AsyncFunction<Void, Void>) (os) -> {
      outputStreamFuture.get().write(buf);
      return Futures.immediateFuture(null);
    });
    return lastWriteFuture;
  }


  void complete() {
    ListenableFuture<Void> future =
        Futures.transform(lastWriteFuture, (AsyncFunction<Void, Void>) x -> {
          outputStreamFuture.get().close();
          return Futures.immediateFuture(null);
        });
    addFinallyCallback(future, () -> {
      this.writeCompleteFuture.set(null);
    });

  }
}

我更新后的 ServletRequestHandler 看起来像这样:

public class ServerRequestHandler extends MessageToMessageDecoder<HttpObject> {

  private NettyHttpServletRequestAdaptor request;

  @Override
  public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    super.handlerAdded(ctx);
  }

  @Override
  public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    super.handlerRemoved(ctx);
  }


  @Override
  protected void decode(ChannelHandlerContext ctx, HttpObject msg, List<Object> out)
      throws Exception {
    if (msg instanceof HttpRequest) {
      HttpRequest request = (HttpRequest) msg;

      this.request = new CustomHttpRequest(request, ctx.channel());

      out.add(this.request);
    }
    if (msg instanceof HttpContent) {
      ByteBuf buf = ((HttpContent) msg).content();
      byte[] bytes = new byte[buf.readableBytes()];
      buf.readBytes(bytes);

      this.request.writeChunk(bytes);

      if (msg instanceof LastHttpContent) {
        this.request.complete();
      }
    }
  }
}

这工作得很好,但请注意,这里的一切都是在一个线程中完成的,也许对于大数据,我可能想要生成一个新线程来释放该线程以用于其他通道。

【问题讨论】:

  • 常规 InputStream 或 OutputStream 不起作用吗? PipedInput/OutputStream 在文档中说“不建议尝试使用单个线程中的两个对象,因为它可能会使线程死锁”。
  • 好吧,你对死锁的看法是对的……但就目前而言,我的解决方案意味着我的 ServiceHandler 需要另一个线程,在这种情况下,它们将用于不同的线程。但是我的ServiceHandler上需要一个InputStream,而我相信我需要以某种方式在我的netty处理程序上写入这个InputStream,而我发现这样做的方法是使用Piped ...其他方法是使用ByteArrayInputStream,但我d 有内存问题或 ByteBuffInputStream,对此我有问题中提到的问题。还有其他想法吗?
  • 我会说试试 BufferedInput/OutputStreams。
  • 我也担心ServerRequestHandler。除非根据请求创建此类,否则一旦收到多个请求,此操作就会失败。
  • @aglassman 我看不出 BuffereInputStream 在这里有什么帮助。

标签: java multithreading stream netty inbound


【解决方案1】:

您在正确的轨道上 - 如果您的 serviceHandler.handle(request, response); 调用正在执行阻塞读取,您需要为其创建一个新线程。记住,应该只有少量的 Netty 工作线程,所以你不应该在工作线程中做任何阻塞调用。

要问的另一个问题是,您的服务处理程序需要阻塞吗?它有什么作用?如果它无论如何都要通过网络铲起数据,你能以非阻塞的方式将它合并到 Netty 管道中吗?这样,一切都是异步的,不需要阻塞调用和额外的线程。

【讨论】:

  • 感谢您的回复。我刚刚用服务处理程序在阅读时不会阻塞的解决方案更新了我的问题。但我很好奇你所说的“为它创建一个 Netty 管道”是什么意思。
  • 抱歉,不清楚。刚刚编辑为“您可以合并到 netty 管道中吗”。基本上,我的观点是,只有当您与以这种方式工作的库交互时,您才需要使用 InputStream 进行阻塞读取。如果您完全控制您的应用程序,只需使用 Netty 方式的通道/处理程序。
  • 这是将现有服务器迁移到 netty 的一部分。该服务器允许开发人员创建自己的服务处理程序。考虑到这一点,我们更愿意通过创建一个 netty 处理程序作为服务处理程序的 Dispatcher 来保持服务处理程序 API netty 不可知。因此,将每个服务处理程序添加到 netty 管道对我来说并不是一个好选择。
猜你喜欢
  • 1970-01-01
  • 2014-09-20
  • 1970-01-01
  • 2010-09-24
  • 1970-01-01
  • 1970-01-01
  • 2023-03-23
  • 2020-07-04
  • 1970-01-01
相关资源
最近更新 更多