【问题标题】:Subscribing to Ktable topic using consumer api使用消费者 api 订阅 Ktable 主题
【发布时间】:2020-06-18 22:08:31
【问题描述】:

对于 Kafka 主题,我可以使用 confluent consumer api 订阅和接收消息。对于 Kafka Ktables,我可以使用 REST API 和 http 客户端进行订阅。所以我的问题是,也许可以不通过rest api而是通过confluent consumer api订阅Kafka表?

这就是我订阅主题的方式:

using (var consumer = new ConsumerBuilder<Ignore, string>(conf).Build())
        {

            try
            {
                consumer.Subscribe(this.TopicLookup);
                while (true)
                {
                    try
                    {

                        var cr = consumer.Consume(cts.Token);
                        Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
                    }
                    catch (ConsumeException e)
                    {
                        Console.WriteLine($"Error occured: {e.Error.Reason}");
                    }
                }
            }
            catch (OperationCanceledException)
            {
                consumer.Close();
            }
        }

这就是我查询 KSQL 的方式:

using (var client = new HttpClient())
        {

            client.Timeout = TimeSpan.FromMilliseconds(Timeout.Infinite);

            var request = new HttpRequestMessage(HttpMethod.Post, url);
            request.Method = HttpMethod.Post;
            request.Content = new System.Net.Http.StringContent("{ \"ksql\": \"select * from userstream7table;\",\"streamsProperties\": { \"ksql.streams.auto.offset.reset\": \"earliest\"}}", Encoding.UTF8, "application/vnd.ksql.v1+json");
            //request.Content.Headers.Add("Accept", "application/vnd.ksql.v1+json");
            using (var response = await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead))
            {
                using (var body = await response.Content.ReadAsStreamAsync())
                using (var reader = new StreamReader(body))
                    while (!reader.EndOfStream)
                        Console.WriteLine(reader.ReadLine());
            }
        }

当我使用 REST/Ksql 主题创建 Ktable 时,也会创建主题。如果我尝试订阅它,我不会收到任何消息。如果我使用 RESP API 查询这个 Ktable,我会得到所有消息。也许无法以标准方式使用这些主题?

【问题讨论】:

    标签: apache-kafka kafka-consumer-api ksqldb


    【解决方案1】:

    该表的主题与其他任何主题一样只是一个 Kafka 主题。您可以随意使用这些数据,包括使用消费者 API。

    您可能遇到错误的原因可能是您没有设置正确的消费者配置。最可能的罪魁祸首是:

    • 不寻求话题的开始,即不将auto.offset.reset设置为earliest
    • 配置了错误的解串器。我看到您使用&lt;Ignore, string&gt; 构建消费者,即忽略键和字符串值。但是值中的数据是字符串吗?这取决于您的 value_format。

    【讨论】:

      猜你喜欢
      • 2017-07-31
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-05-16
      • 1970-01-01
      • 2018-08-14
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多