【问题标题】:Java: gRPC with FutureStub and ListenableFutureJava:带有 FutureStub 和 ListenableFuture 的 gRPC
【发布时间】:2021-03-29 15:28:04
【问题描述】:

我正在使用 Java 学习 gRPC,例如,我定义了三种 request 类型(cuboid球体圆柱体)和一个单一的响应类型(字符串),我在其中放置有关特定几何体的计算体积的消息。我遵循this 示例,它在客户端使用阻塞存根并且程序正确运行。但是,我想尝试异步方法,所以这是我的客户端代码,用newFutureStubListenableFuture 编写:

public static void main(String[] args) {
        ManagedChannel channel = ManagedChannelBuilder
                .forAddress("localhost",8080)
                .usePlaintext()
                .build();

        GeometryServiceGrpc.GeometryServiceFutureStub stub = GeometryServiceGrpc.newFutureStub(channel);

         ListenableFuture<Response> cuboidResp = stub.calcCuboidVol(CuboidVolumeRequest.newBuilder()
                .setLength(2)
                .setWidth(3)
                .setHeight(4)
                .build());
         cuboidResp.addListener(() -> {
             try {
                 System.out.println(cuboidResp.get().getResponse());
             } catch (InterruptedException e) {
                 e.printStackTrace();
             } catch (ExecutionException e) {
                 e.printStackTrace();
             }
         }, command -> {
             command.run();
         });

        ListenableFuture<Response> sphereResp = stub.calcSphereVol(SphereVolumeRequest.newBuilder()
                .setRadius(2)
                .build());
        sphereResp.addListener(() -> {
            try {
                System.out.println(sphereResp.get().getResponse());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }, command -> {
            command.run();
        });

        ListenableFuture<Response> cylinderResp = stub.calcCylinderVol(CylinderVolumeRequest.newBuilder()
                .setRadius(2)
                .setHeight(3)
                .build());
        cylinderResp.addListener(() -> {
            try {
                System.out.println(cylinderResp.get().getResponse());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }, command -> {
            command.run();
        });

        channel.shutdown();
}

由于我们必须使用ListenableFuture,因此我将其方法addListener() 附加到特定几何方法的每个返回实例(Response 消息有一个名为string 的单个string 字段response)。基本上,所需的结果只是在计算准备好后打印计算,这就是为什么在 runnable 部分中调用 System.out.println 的原因。尽管程序成功执行,但没有打印任何内容。但是如果检查一个特定的几何图形,那么所有结果都会出现在控制台上。示例:

        if(!cylinderResp.isDone()) {
           try {
                cylinderResp.get().getResponse();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }

这是否意味着程序在异步部分完成之前终止?并且可执行部分(commnad)是否正确编写(我搜索并发现如果我们希望代码在不同的线程上运行,这部分可以由ExecutorService执行 - 在我的情况下不是必需的)?

【问题讨论】:

  • 一般来说,你不应该直接使用addListener,而是使用例如Futures.transform 或类似的添加后续Futures,然后您可以等待完成。
  • 谢谢,我不知道。非常有趣的是,为什么在只等待一个 ListenableFuture(第二个代码 sn-p)之后,所有三个 runnable 部分都会被执行。

标签: java grpc grpc-java


【解决方案1】:

Does this means that the program terminates before the asynchronous part is completed?是的,没错,你应该等待它通过调用cylinderResp.get()channel.awaitTermination(5, TimeUnit.SECONDS);执行完成

And is the executable part (commnad) written correctly,如果你想在指定线程池中运行任务,你应该添加一个ExecutorService,否则它将默认线程池执行。

对于完全异步,您应该使用StreamObserver

有关如何在 Java 中使用 gRPC 的更多详细信息,您可以参考我的代码 helloworlde/grpc-java-sample

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-03-10
    • 2021-08-01
    • 1970-01-01
    相关资源
    最近更新 更多