【问题标题】:Applying Multiple Filters + Write to Multiple Topics in a Loop on Kafka Streams在 Kafka 流的循环中应用多个过滤器 + 写入多个主题
【发布时间】:2018-08-21 17:29:49
【问题描述】:

我有一个过滤器列表(其中 schema_field='val')和相应主题的要求。我需要遍历这些过滤器列表并应用它们,然后使用 KStreams 将过滤后的记录值写入其特定主题。有这样的功能吗?

例子:

synchronized (subscriberFilterRequirements) {
    Iterator<SubscriberFilterRequirements> itr = subscriberFilterRequirements.iterator();
    while (itr.hasNext()) {
        SubscriberFilterRequirements req = itr.next();
        log.info("*** Applying transformations on record");
        KStream<String, GenericRecord> subscriberFilteredRecord = filteredRecord;
        if (req.getPipelineSubscriptions().getFiltersql() != null && !req.getPipelineSubscriptions().getFiltersql().isEmpty()) {
            subscriberFilteredRecord = filteredRecord.filter((key, value) -> {
                String[] filter = req.getPipelineSubscriptions().getFiltersql().trim().split("=");
                return value.get(filter[0]).toString().equalsIgnoreCase(filter[1]);
            })
         }
        Schema schema = Utils.getAvroSchema(req.getPipelineSubscriptions().getSubscriberSchemaLocation(),
                    req.getPipelineSubscriptions().getSubscriberSchemaLocationType());
        GenericRecord sinkRecord = new GenericData.Record(schema);
        List<Schema.Field> schemaFieldsList = schema.getFields();
        Iterator<Schema.Field> sinkIterator = schemaFieldsList.iterator();
        subscriberFilteredRecord.map((key, value) -> {
            fillAvroRecord(sinkRecord, sinkIterator, value);
            return new KeyValue<>(key, sinkRecord);
        }).to(req.getPipelineSubscriptions().getKafkaTopic());
    }
}

目前,正在发生的事情是,循环的上下文和 KStream 的上下文不一样。开始流式传输时,循环第一次执行良好,即 KStream 接收第一个过滤器,从那时起,KStream 像无限循环一样运行,而不使用第二个过滤器。我想注入其余的过滤器,一个接一个地应用到记录上。

【问题讨论】:

  • 您的代码示例应该可以工作。当然,您可能希望在to() 操作中使用不同的主题名称。您可以先将每个过滤器映射到相应的输出主题,然后将 forEach 应用于 filter-output-topic-pairs 以使每个过滤器具有不同的输出主题。
  • 以下示例:filteredRecord.filter((key, value) -&gt; { log.info("Applying filter() for record with key... " + key.trim()); String[] filter = req.getPipelineSubscriptions().getFiltersql().trim().split("="); return value.get(filter[0]).toString().equalsIgnoreCase(filter[1]); }).map((key, value) -&gt; { log.info("Applying map() for record with key... " + key.trim()); return new KeyValue&lt;&gt;(key, record); }).to(r.getKafkaTopic()); 抱歉,无法格式化代码块。
  • 我正在写不同的主题。但似乎不起作用。我在地图和过滤器的 lambda 函数中有一些日志消息。未打印日志消息。
  • 不确定filteredRecord 是什么。我添加了一个答案。希望这可以帮助。顺便说一句:您也可以更新问题;)
  • 没关系,我的想法不正确。我的代码工作正常。我把日志放在了错误的地方(新手错误:()...非常感谢你的帮助@MatthiasJ.Sax。:)

标签: apache-kafka apache-kafka-streams kafka-producer-api


【解决方案1】:

假设您有 3 个过滤谓词 p1p2p3,您可以这样做:

KStream stream = ...
stream.filter(p1).to("output-1");
stream.filter(p2).to("output-2");
stream.filter(p3).to("output-3");

// or as a loop
Predicate[] predicate = new Predicate[]{p1,p2,p3};
String[] outputTopic = new String[]{"output-1","output-2","output-3"};
for(int i = 0; i < 3; ++i) {
    stream.filter(predicate[i]).to(outputTopic[i]);
}

如果您有谓词输出主题对的集合,这也应该通过 foreach() 和 lambda 表达式起作用。

【讨论】:

  • 当我尝试这样的事情时,它不会占用其他过滤器。似乎只应用了第一个过滤器。
  • 输出主题的数量也不固定。它们可以随时变化。
  • 您能在问题中分享您当前的代码吗?您还可以发布Topology#describe() 打印的内容吗? “而且输出主题的数量也不固定。” -- 你的意思是在运行时?这将不被支持。程序在执行期间不能更改。也许你可以通过处理器 API 构建更动态的东西,并在 to() 中使用动态路由。不是 100% 确定这是否可行
【解决方案2】:

我猜你需要在KStream 上使用branch 方法和多个谓词(过滤器),如下所示:

Predicate<Object, String>[] branchingPredicates = ...;
KStream<Object, String>[] branchingStreams = kStream.branch(branchingPredicates);

for (int branchingIndex = 0; branchingIndex < branchingStreams.length; branchingIndex++) {
    branchingStreams[branchingIndex].map((k,v) -> { ... }).to(specificKafkaTopic);
}

【讨论】:

  • 使用branch(),每条记录都将被发送到一个输出流——看来,这个问题意味着输入流应该被“广播”以在不同的条件下进行多次过滤。
  • 没错,分支会做一个“第一次匹配”。我需要对所有记录应用所有过滤器。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-12-08
  • 1970-01-01
  • 1970-01-01
  • 2021-11-16
  • 2018-08-19
  • 1970-01-01
相关资源
最近更新 更多