【问题标题】:How do I create a file sending client/server with RSocket?如何使用 RSocket 创建文件发送客户端/服务器?
【发布时间】:2018-04-27 11:31:00
【问题描述】:

我似乎在RSocket 上找不到任何资源/教程,除了在 GitHub 上阅读他们的代码,我不明白。

我的服务器上有一个文件路径:String serverFilePath;

我希望能够从我的客户端下载它(最好使用RSocket's Aeron implementation)。有谁知道如何使用 RSocket 做到这一点?

提前致谢。

【问题讨论】:

    标签: java download file-sharing aeron rsocket


    【解决方案1】:

    我从事 RSocket 工作,并编写了大部分 java 版本,包括 Aeron 传输。

    我目前不建议使用 Aeron 实现。您可以通过以下几种方式发送文件:

    1. 使用 requestChannel 将数据推送到远程服务器。
    2. 使用 requestChannel 或 requestStream 将字节流式传输到客户端。

    这是一个使用 requestStream 的示例:

      public class FileCopy {
    
      public static void main(String... args) throws Exception {
    
        // Create a socket that receives incoming connections
        RSocketFactory.receive()
            .acceptor(
                new SocketAcceptor() {
                  @Override
                  // Create a new socket acceptor
                  public Mono<RSocket> accept(ConnectionSetupPayload setup, RSocket sendingSocket) {
                    return Mono.just(
                        new AbstractRSocket() {
                          @Override
                          public Flux<Payload> requestStream(Payload payload) {
                            // Get the path of the file to copy
                            String path = payload.getDataUtf8();
                            SeekableByteChannel _channel = null;
    
                            try {
                              _channel = Files.newByteChannel(Paths.get(path), StandardOpenOption.READ);
                            } catch (IOException e) {
                              return Flux.error(e);
                            }
    
                            ReferenceCountUtil.safeRelease(payload);
    
                            SeekableByteChannel channel = _channel;
                            // Use Flux.generate to create a publisher that returns file at 1024 bytes
                            // at a time
                            return Flux.generate(
                                sink -> {
                                  try {
                                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                                    int read = channel.read(buffer);
                                    buffer.flip();
                                    sink.next(DefaultPayload.create(buffer));
    
                                    if (read == -1) {
                                      channel.close();
                                      sink.complete();
                                    }
                                  } catch (Throwable t) {
                                    sink.error(t);
                                  }
                                });
                          }
                        });
                  }
                })
            .transport(TcpServerTransport.create(9090))
            .start()
            .subscribe();
    
        String path = args[0];
        String dest = args[1];
    
        // Connect to a server
        RSocket client =
            RSocketFactory.connect().transport(TcpClientTransport.create(9090)).start().block();
    
        File f = new File(dest);
        f.createNewFile();
    
        // Open a channel to a new file
        SeekableByteChannel channel =
            Files.newByteChannel(f.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE);
    
        // Request a stream of bytes
        client
            .requestStream(DefaultPayload.create(path))
            .doOnNext(
                payload -> {
                  try {
                    // Write the bytes received to the new file
                    ByteBuffer data = payload.getData();
                    channel.write(data);
    
                    // Release the payload
                    ReferenceCountUtil.safeRelease(payload);
                  } catch (Exception e) {
                      throw new RuntimeException(e);
                  }
                })
            // Block until all the bytes are received
            .blockLast();
    
        // Close the file you're writing too
        channel.close();
      }
    }
    

    【讨论】:

      【解决方案2】:

      【讨论】:

        猜你喜欢
        • 2020-08-06
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2015-06-25
        • 2013-08-25
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多