【发布时间】:2020-06-22 21:52:06
【问题描述】:
我使用的是 kafka 处理器 api(不是 DSL)
public class StreamProcessor implements Processor<String, String>
{
public ProcessorContext context;
public void init(ProcessorContext context)
{
this.context = context;
context.commit()
//statestore initialized with key,value
}
public void process(String key, String val)
{
try
{
String[] topicList = stateStore.get(key).split("|");
for(String topic: topicList)
{
context.forward(key,val,To.child(consumerTopic));
} // forward same message to list of topics ( 1..n topics) , rollback if write to some topics failed ?
}
}
}
场景:我们正在从源主题和流中读取数据 处理器将数据写入多个接收器主题(上面的主题列表)。
问题:如何使用kafka流实现回滚机制 处理器 api 当上面的 topicList 中的一个或多个主题时 收不到消息? .
据我所知,处理器 api 对每个都有回滚机制 记录它发送失败,或者可以回滚一整批 失败的消息也能实现?作为工艺方法 处理器接口是按记录而不是按批次调用的,因此我 会猜测它只能按记录完成。这是正确的假设吗?,如果不是,请建议 如何使用处理器 api 实现每条记录和每批失败主题的回滚。
【问题讨论】:
-
您是否尝试向此处理器添加多个接收器并使用
To.all()context.forward(key, val, To.all());? -
@JavaTechnical,谢谢,是的,我有,但是如果消息传递到某个主题失败(如果主题由于某种原因不可用),它与回滚机制有什么关系。此外,我想在失败的情况下将数据记录到错误主题,我猜 context.forward 到非接收节点已被贬低..所以我猜错误主题必须是接收节点,对吗?在那种情况下,我可能不想使用 To.all ,如何通过它?
标签: apache-kafka apache-kafka-streams