【问题标题】:Open and close channel in the gRPC client with every request每次请求时在 gRPC 客户端中打开和关闭通道
【发布时间】:2020-05-22 16:32:36
【问题描述】:

我在 kafka 应用程序中有一个 gRPC 客户端。这意味着客户端将不断打开和关闭通道。

public class UserAgentClient {

    protected final Logger logger = LoggerFactory.getLogger(getClass());
    private static final Config uaparserConfig = ConfigFactory.load().getConfig(ua);
    private final ManagedChannel channel;
    private final UserAgentServiceGrpc.UserAgentServiceBlockingStub userAgentBlockingStub;

    public UserAgentParserClient() {
        this(ManagedChannelBuilder.forAddress(uaConfig.getString("host"), uaConfig.getInt("port")).usePlaintext());
    }

    public UserAgentClient(ManagedChannelBuilder<?> usePlaintext) {
        channel = usePlaintext.build();
        userAgentBlockingStub = UserAgentServiceGrpc.newBlockingStub(channel);
    }

    public UserAgentParseResponse getUserAgent(String userAgent ) {
        UserAgentRequest request = UserAgentRequest.newBuilder().setUserAgent(userAgent).build();
        UserAgentParseResponse response = null;
        try {
            response = userAgentBlockingStub.parseUserAgent(request);
        } catch(Exception e) {
            logger.warn("An exception has occurred during gRPC call to the user agent.", e.getMessage());
        }
        shutdown();
        return response;
    }

    public void shutdown() {
        try {
            channel.shutdown();
        } catch (InterruptedException ie) {
            logger.warn("Interrupted exception during gRPC channel close", ie);
        }
    }
}

我想知道我是否可以一直保持频道打开?或者我每次拨打新电话时都必须打开一个频道?我想知道,因为我正在测试性能,如果我只是保持通道打开,它似乎会大大改善。另一方面,有什么我想念的吗?

【问题讨论】:

    标签: grpc kafka-consumer-api apache-kafka-streams grpc-java


    【解决方案1】:

    创建新频道的开销很大,您应该尽可能长时间地保持频道打开。

    【讨论】:

      【解决方案2】:

      由于频道的打开和关闭很昂贵,我从我的客户中完全删除了channel = usePlaintext.build(); 相反,我在我的 kafka Transformer 中打开和关闭它。在我的班级UserAgentDataEnricher 中实现了Transformer

      public class UserAgentDataEnricher implements Transformer<byte[], EnrichedData, KeyValue<byte[], EnrichedData>> {
      
          private UserAgentParserClient userAgentParserClient;
      
          @Override
          public void init(ProcessorContext context) {
              this.context = context;
              open();
              // schedule a punctuate() method every 15 minutes
              this.context.schedule(900000, PunctuationType.WALL_CLOCK_TIME, (timestamp) -> {
                  close();
                  open();
                  logger.info("Re-opening of user agent channel is initialized");
              });
          }
      
          @Override
          public void close() {
              userAgentParserClient.shutdown();
          }
      
          private void open() {
              channel = ManagedChannelBuilder.forAddress("localhost", 50051).usePlaintext().build();
              userAgentClient = new UserAgentClient(channel);
          }
      
          ...
      }
      

      现在我像这样初始化我的客户端:

      public UserAgentClient(ManagedChannel channel) {
          this.channel = channel;
          userAgentBlockingStub = UserAgentServiceGrpc.newBlockingStub(channel);
      }
      

      【讨论】:

        猜你喜欢
        • 2016-05-12
        • 2020-02-10
        • 1970-01-01
        • 2021-03-25
        • 2023-03-31
        • 2018-07-27
        • 2020-06-03
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多