【问题标题】:Does Kafka API and/or protocol offer a way to query for server properties?Kafka API 和/或协议是否提供查询服务器属性的方法?
【发布时间】:2019-03-28 01:39:27
【问题描述】:

我正在编写一个 Kafka 生产者,它有时会发送一个请求,其中包含超过最大允许请求大小的一批消息。似乎我无法直接访问要向其发送消息的 Kafka 集群的服务器属性,并且我还没有找到一种方法来查询服务器以获取在 server.properties 文件中设置的值.

示例

尝试发送过大的消息会触发 Kafka 日志说...

11:47:37 kafka.1     | Topic and partition to exceptions: 
page-visits-0 -> org.apache.kafka.common.errors.RecordTooLargeException 
(kafka.server.KafkaApis)

【问题讨论】:

    标签: apache-kafka


    【解决方案1】:

    您可以使用KafkaAdminClient API 获取集群信息。它可以提供代理级别以及主题级别的信息。下面的代码将为每个节点提供服务器配置。

    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    
    import org.apache.kafka.clients.admin.AdminClient;
    import org.apache.kafka.clients.admin.Config;
    import org.apache.kafka.clients.admin.DescribeClusterResult;
    import org.apache.kafka.clients.admin.DescribeConfigsResult;
    import org.apache.kafka.common.Node;
    import org.apache.kafka.common.config.ConfigResource;
    import org.apache.kafka.common.config.ConfigResource.Type;
    
    
    public class ListTopics {
    
        public static void main(String[] args) throws IOException, InterruptedException, ExecutionException {
            Properties prop = new Properties();
            prop.setProperty("bootstrap.servers","localhost:9092");
            AdminClient admin = AdminClient.create(prop);
            DescribeClusterResult describeClusterResult = admin.describeCluster();
            List<Node> nodes = new ArrayList<>(describeClusterResult.nodes().get());
            // Pass the broker node ID here. You can use for loop in case of multiple broker nodes.
            ConfigResource resource = new ConfigResource(Type.BROKER, String.valueOf(nodes.get(0).id()));
    
            DescribeConfigsResult configs = admin.describeConfigs(Collections.singletonList(resource));
            Map<ConfigResource, Config> config = configs.all().get();
            System.out.println(config   );
        }
    }
    

    附:此 API 仅可用于 Kafka 0.11 及更高版本的安装。

    【讨论】:

      【解决方案2】:

      假设您的集群至少运行 Kafka 0.11,您可以使用 AdminClient describeConfigs() API 来检索代理配置。

      例如:

      Properties configs = new Properties();
      configs.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
      AdminClient client = AdminClient.create(configs);
      
      List<ConfigResource> resources = Arrays.asList(new ConfigResource(Type.BROKER, "0"));
      DescribeConfigsResult dcr = client.describeConfigs(resources);
      for (Map.Entry<ConfigResource, Config> entry : dcr.all().get().entrySet()) {
          System.out.println(entry.getKey() + " - " + entry.getValue());
      }
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2014-09-04
        • 2012-05-05
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多