不需要中间客户端、系统、kafka 流花哨的工具或奇迹框架,专注于旧的简单方法:重用该生产者以及生成和发送完整的 POJOS。
Producers 是线程安全的,因此使用相同的生产者实例通过不同的线程生产两个或多个主题是完全可以的。
这只是一个简化,因为我不知道你的实现细节。我假设消息(POJO)是一个简单的String。使用一些想象力,相信不同的字母是字段。从fullPojo,您想向另一个主题发送仅包含两个字段(表示为y 和v)的消息。
String fullPojo = "xxxxxyxxxxv";
//some logic to extract the desired fields
String shortPojo = getDesiredFields(fullPojo);
/* shortPojo="yv" */
在您的 Kafka 集群上创建一个新主题,在本例中,它将被称为 shortPojoTopic。
只需使用相同的producer 将完整数据发送到您的原始主题,方法是进行第二次调用,以便使用仅包含过滤值的消息填充短主题:
producer.send(new ProducerRecord<String, String>(fullPojoTopic, fullPojo));
producer.send(new ProducerRecord<String, String>(shortPojoTopic, shortPojo));
第二次调用也可以从另一个辅助线程完成。如果您希望在这里完成多线程,您可以定义第二个线程来执行“过滤”工作。只需将原始 producer 引用传递给第二个线程,并将两个线程与包含 fullPojos 的 FIFO 结构 (deques, queues,...) 链接。
- 原线程将fullPojo发送到
fullPojoTopic主题,并将fullPojo推入队列。
- 此辅助“过滤器”线程将从队列/双端队列中删除顶部消息,提取创建 shortPojo 所需的字段,并将其发送到
shortPojoTopic(使用相同的 producer,无需担心生产者同步问题)。
如果主题之一处于错误状态并且无法接受更多消息,或者主题之一位于刚刚失败的不同 Kafka 集群上(在这种情况下,您也需要两个不同的生产者),或者即使过滤过程在过滤某些格式错误的消息时发现了一些困难。
例如,即使shortPojoTopic out,也不会影响第一个线程的性能,因为它将继续发送他的 fullPojos 而不会出现问题/延迟。
时刻注意内存使用:队列/双端队列的大小应该以某种方式限制/控制,以避免第二个线程被卡住很长时间,或者如果它不能跟随节奏时出现 OOM的第一个线程。如果发生这种情况,它将无法足够快地读取/删除消息,从而产生可能导致上述 OOM 问题的延迟
此外,即使没有主题/代理存在问题,这种分离也将提高总体性能,因为原始线程不必等待每次迭代时在其线程中发生的过滤过程。
第一个线程只是发送 POJO;第二个线程只是过滤并发送短 POJO。 简单的职责,全部并行。
假设您可以控制生产者及其发送的内容,我建议将逻辑直接放在那里,以避免其他中间系统(流,...)。只需提取核心代码中的字段并使用相同的生产者将恢复的 Pojo 生成到另一个主题。只使用一个线程或尽可能多地使用。
我打赌我自己的房子和右手,这比你能想到的任何流实用程序都要快得多。
如果您无权访问该代码,您可以创建一个中间消费者-生产者服务,在下一节继续。
如果您只能访问完整的 POJO 主题,而不能访问上一步(生成消息并将它们发送到主题的代码),则第二个选项可能是创建一个中间 kafka 消费者-生产者,它使用来自fullPojoTopic 的消息,提取字段并将过滤的shortPojo 生成到shortPojoTopic。
请注意,逻辑与第一种方法中的逻辑相同,但此解决方案意味着更多的资源浪费:新的生产者和消费者线程(相信我,它们创建了一个大量的辅助线程),一个新的消费者组来管理,fullPOJO 消息在线上的双重传输,等等。
我的意见是,仅当您无法直接访问以第一种方式生成和生成完整 POJOS 的代码,或者您希望更好地控制过滤完整 POJOS 的微服务时,才应使用此选项数据并将所需字段发送到另一个主题。