【问题标题】:Is there a way to consume a Kafka Ksql Push query from .NET有没有办法从 .NET 使用 Kafka Ksql Push 查询
【发布时间】:2020-09-04 20:35:19
【问题描述】:

我目前正在使用 Kafka 消费者在 .NET 中处理大量 Kafka 消息。

我处理的第 1 步是解析 JSON 并根据 JSON 中特定字段的值丢弃许多消息。

我不想首先处理(特别是不下载)那些不需要的消息。

看起来像一个 kSql 查询——写成一个推送查询——可以有效地过滤掉我需要处理的消息。

我怎样才能通过 .NET 使用这些?我看到一些文档提到了 REST API,但我怀疑这是一个好主意,我需要在一天的高峰时间每分钟处理超过 100 000 条记录。(如果我可以选择性地下载和处理消息,我只会正在处理大约三分之一的当前音量。)

很遗憾,我无法控制发布者,因此我无法更改发布消息的内容/方式。

【问题讨论】:

    标签: c# .net apache-kafka ksqldb


    【解决方案1】:

    是的,你可以使用 ksqlDB 来做到这一点

    -- Declare a stream on the source topic
    -- Because it's JSON you'll need to specify the schema
    CREATE STREAM my_source (COL1 VARCHAR, COL2 INT) 
      WITH (KAFKA_TOPIC='my_source_topic', VALUE_FORMAT='JSON');
    
    -- Apply the filter to the stream, with the results written
    -- to a new stream (backed by a new topic)
    CREATE STREAM target WITH (KAFKA_TOPIC='my_target_topic') AS
      SELECT * FROM my_source WHERE COL1='FOO';
    

    然后在您的应用程序中使用the REST API 运行推送查询,该查询将仅使用过滤后的消息:

    SELECT * FROM target EMIT CHANGES;
    

    除了 ksqlDB,您可能还想看看这个社区最近发布的项目:https://github.com/LGouellec/kafka-streams-dotnet

    【讨论】:

    • @Robbin Moffatt。谢谢。我看到了你提到的流项目,但看起来还很早。仍然 - 值得一试。您认为通过 REST API 高速处理大量数据真的可行吗?这对我来说似乎有点狡猾 - 但我不是这方面的专家。 (如果我必须进行大量 REST 调用,我主要担心速度)
    【解决方案2】:

    您可以通过以下方式使用 ksqldb Linq 提供程序。

    使用 Nuget 包管理器安装包:

    Install-Package ksqlDB.RestApi.Client
    

    使用 C# (.NET) 创建查询:

    var ksqlDbUrl = @"http:\\localhost:8088";
    var contextOptions = new KSqlDBContextOptions(ksqlDbUrl);
          
    await using var context = new KSqlDBContext(contextOptions);
    
    using var subscription = context.CreateQueryStream<Message>() //stream name
      .Where(p => p.RowTime >= 1510923225000) // add your own conditions
      //....
      .Select(l => new { l.Id, l.Message, l.RowTime })
      .Subscribe(onNext: message =>
      {
      }, onError: error => {  }, onCompleted: () => { });
    

    上面的C#代码等价于下面的ksql:

    SELECT Id, Message, RowTime FROM Messages WHERE RowTime >= 1510923225000 EMIT CHANGES;
    

    项目Wiki 更多运营商。

    【讨论】:

      猜你喜欢
      • 2020-01-30
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2012-01-10
      • 1970-01-01
      • 1970-01-01
      • 2023-01-20
      • 2019-07-06
      相关资源
      最近更新 更多