【问题标题】:how to call server by InputStream of MethodDescriptor in Grpc?如何通过 Grpc 中 MethodDescriptor 的 InputStream 调用服务器?
【发布时间】:2021-08-11 23:14:31
【问题描述】:

我想在客户端通过 MethodDescriptor 的 InputStream 调用 grpc 服务器,但是,我没有成功。这是我的代码:

环境
jdk 1.8 
grpc 1.33.1

grpc-all 依赖的版本是:

<dependency>
    <groupId>io.grpc</groupId>
    <artifactId>grpc-all</artifactId>
    <version>1.33.1</version>
    <scope>provided</scope>
 </dependency>

在 Server 中,My EchoServiceImpl 扩展 EchoServiceGrpc.EchoServiceImplBase,覆盖 echo rpc 方法。我在构建服务器之前导出输入流服务。

服务器
// 1. implements service
@Service
public class EchoServiceImpl extends EchoServiceGrpc.EchoServiceImplBase {

    @Override
    public void echo(EchoRequest request, StreamObserver<EchoResponse> responseObserver) {
        System.out.println("Received: " + request.getMessage());
               System.out.println("Received: " + request.getMessage());
        EchoResponse.Builder response = EchoResponse.newBuilder()
                .setMessage("ReceivedHELLO");
        responseObserver.onNext(response.build());
        responseObserver.onCompleted();
    }
}

// 2. export  inputstream  Service
  private void exportService(final Object bean) {
        BindableService bindableService = (BindableService) bean;
        ServerServiceDefinition serviceDefinition = bindableService.bindService();
        try {
            ServerServiceDefinition isDefinition = ServerInterceptors.useInputStreamMessages(serviceDefinition);//inputstream

            serviceDefinitions.add(serviceDefinition);
            serviceDefinitions.add(isDefinition);
        } catch (Exception e) {
            log.error("export service is fail", e);
        }
    }

// 3. start Grpc Server
private void startGrpcServer() {
        ServerBuilder<?> serverBuilder = grpcServerBuilder.buildServerBuilder();
        List<ServerServiceDefinition> serviceDefinitions = grpcClientBeanPostProcessor.getServiceDefinitions();
        for (ServerServiceDefinition serviceDefinition : serviceDefinitions) {
            serverBuilder.addService(serviceDefinition);        
        }

        try {
            Server server = serverBuilder.build().start();
            log.info("Grpc server started successfully");
        } catch (IOException e) {
            log.error("Grpc server failed to start", e);
        }
    }

在客户端,我创建了 InputStream MethodDescriptor,RequestMarshaller 是 inputstram,但我现在不是成功工具。谁能提供一些建议?谢谢~~~

客户
public class ClientDemo {

    private static Map<String,Object> methodDescriptorCache = Maps.newHashMap();
    //  InputStream marshaller  
    final static MethodDescriptor.Marshaller<InputStream> marshaller = new MethodDescriptor.Marshaller<InputStream>() {
        @Override
        public InputStream stream(final InputStream value) {
            return value;
        }

        @Override
        public InputStream parse(final InputStream stream) {
            if (stream.markSupported()) {
                return stream;
            } else {
                return new BufferedInputStream(stream);
            }
        }
    };


    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //1.build channel
        ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 8080).usePlaintext().build();

        //2. create InputStream MethodDescriptor
        MethodDescriptor<InputStream, InputStream> echo = createInputStreamMethodDescriptor("echo.EchoService", "echo");

        //3. channel.newCall()
        ClientCall<InputStream, InputStream> call =
                channel.newCall(echo, CallOptions.DEFAULT);

        //4. input params
        String req = "{\"message\":\"input stream\"}";
        InputStream request = new ByteArrayInputStream(req.getBytes(StandardCharsets.UTF_8));
        System.out.println(request);
        
        //5. get result
        ListenableFuture<InputStream> res = ClientCalls.futureUnaryCall(call, request);
        System.out.println(res.get());
    }

    public static io.grpc.MethodDescriptor<InputStream, InputStream> createInputStreamMethodDescriptor(String clazzName, String methodName) {
        io.grpc.MethodDescriptor<InputStream, InputStream> methodDescriptor = (MethodDescriptor<InputStream, InputStream>) methodDescriptorCache
                .get(clazzName + methodName);
        if (methodDescriptor != null)
            return methodDescriptor;
        else {
            methodDescriptor = io.grpc.MethodDescriptor.<InputStream, InputStream> newBuilder()
                    .setType(MethodDescriptor.MethodType.UNARY)//
                    .setFullMethodName(io.grpc.MethodDescriptor.generateFullMethodName(clazzName, methodName))//
                    .setRequestMarshaller(marshaller)//
                    .setResponseMarshaller(marshaller)//
                    .setSafe(false)//
                    .setIdempotent(false)//
                    .build();
            methodDescriptorCache.put(clazzName + methodName, methodDescriptor);
            return methodDescriptor;
        }
    }
}
异常消息
 [grpc-nio-worker-ELG-1-2] DEBUG io.grpc.netty.NettyClientHandler - [id: 0xa764352c, L:/127.0.0.1:55783 - R:localhost/127.0.0.1:8080] INBOUND HEADERS: streamId=3 headers=GrpcHttp2ResponseHeaders[:status: 200, content-type: application/grpc, grpc-status: 2] streamDependency=0 weight=16 exclusive=false padding=0 endStream=true
Exception in thread "main" java.util.concurrent.ExecutionException: io.grpc.StatusRuntimeException: UNKNOWN
    at com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:564)
    at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:545)
    at org.apache.shenyu.examples.grpc.test.ClientDemo2.main(ClientDemo2.java:56)
Caused by: io.grpc.StatusRuntimeException: UNKNOWN
    at io.grpc.Status.asRuntimeException(Status.java:533)
    at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:533)
    at io.grpc.internal.DelayedClientCall$DelayedListener$3.run(DelayedClientCall.java:464)
    at io.grpc.internal.DelayedClientCall$DelayedListener.delayOrExecute(DelayedClientCall.java:428)
    at io.grpc.internal.DelayedClientCall$DelayedListener.onClose(DelayedClientCall.java:461)
    at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:617)
    at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:803)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:782)
    at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
    at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Process finished with exit code 1

【问题讨论】:

    标签: java grpc


    【解决方案1】:

    【讨论】:

    • 感谢您的回答。但是,通过反射获取描述符然后调用服务器会导致两次网络请求,可能会比较耗时。我想这样优化。
    • @midnight2104 可以参考方法generateMethodDescriptor,反射请求是get Marshaller,如果知道marshal类型就不需要反射了。
    猜你喜欢
    • 2021-07-31
    • 2018-07-28
    • 2019-10-28
    • 1970-01-01
    • 2022-01-13
    • 2022-09-23
    • 1970-01-01
    • 2022-10-18
    • 1970-01-01
    相关资源
    最近更新 更多