【发布时间】:2018-08-21 17:29:49
【问题描述】:
我有一个过滤器列表(其中 schema_field='val')和相应主题的要求。我需要遍历这些过滤器列表并应用它们,然后使用 KStreams 将过滤后的记录值写入其特定主题。有这样的功能吗?
例子:
synchronized (subscriberFilterRequirements) {
Iterator<SubscriberFilterRequirements> itr = subscriberFilterRequirements.iterator();
while (itr.hasNext()) {
SubscriberFilterRequirements req = itr.next();
log.info("*** Applying transformations on record");
KStream<String, GenericRecord> subscriberFilteredRecord = filteredRecord;
if (req.getPipelineSubscriptions().getFiltersql() != null && !req.getPipelineSubscriptions().getFiltersql().isEmpty()) {
subscriberFilteredRecord = filteredRecord.filter((key, value) -> {
String[] filter = req.getPipelineSubscriptions().getFiltersql().trim().split("=");
return value.get(filter[0]).toString().equalsIgnoreCase(filter[1]);
})
}
Schema schema = Utils.getAvroSchema(req.getPipelineSubscriptions().getSubscriberSchemaLocation(),
req.getPipelineSubscriptions().getSubscriberSchemaLocationType());
GenericRecord sinkRecord = new GenericData.Record(schema);
List<Schema.Field> schemaFieldsList = schema.getFields();
Iterator<Schema.Field> sinkIterator = schemaFieldsList.iterator();
subscriberFilteredRecord.map((key, value) -> {
fillAvroRecord(sinkRecord, sinkIterator, value);
return new KeyValue<>(key, sinkRecord);
}).to(req.getPipelineSubscriptions().getKafkaTopic());
}
}
目前,正在发生的事情是,循环的上下文和 KStream 的上下文不一样。开始流式传输时,循环第一次执行良好,即 KStream 接收第一个过滤器,从那时起,KStream 像无限循环一样运行,而不使用第二个过滤器。我想注入其余的过滤器,一个接一个地应用到记录上。
【问题讨论】:
-
您的代码示例应该可以工作。当然,您可能希望在
to()操作中使用不同的主题名称。您可以先将每个过滤器映射到相应的输出主题,然后将forEach应用于 filter-output-topic-pairs 以使每个过滤器具有不同的输出主题。 -
以下示例:
filteredRecord.filter((key, value) -> { log.info("Applying filter() for record with key... " + key.trim()); String[] filter = req.getPipelineSubscriptions().getFiltersql().trim().split("="); return value.get(filter[0]).toString().equalsIgnoreCase(filter[1]); }).map((key, value) -> { log.info("Applying map() for record with key... " + key.trim()); return new KeyValue<>(key, record); }).to(r.getKafkaTopic());抱歉,无法格式化代码块。 -
我正在写不同的主题。但似乎不起作用。我在地图和过滤器的 lambda 函数中有一些日志消息。未打印日志消息。
-
不确定
filteredRecord是什么。我添加了一个答案。希望这可以帮助。顺便说一句:您也可以更新问题;) -
没关系,我的想法不正确。我的代码工作正常。我把日志放在了错误的地方(新手错误:()...非常感谢你的帮助@MatthiasJ.Sax。:)
标签: apache-kafka apache-kafka-streams kafka-producer-api