【问题标题】:Roll back mechanism in kafka processor api?kafka处理器api中的回滚机制?
【发布时间】:2020-06-22 21:52:06
【问题描述】:

我使用的是 kafka 处理器 api(不是 DSL)

public class StreamProcessor implements Processor<String, String> 
{

    public ProcessorContext context;

    public void init(ProcessorContext context) 
    {
        this.context = context;
        context.commit()
        //statestore initialized with key,value
    }

    public void process(String key, String val)
    {
        try
        {
            String[] topicList = stateStore.get(key).split("|"); 
            for(String topic: topicList) 
            {
                    context.forward(key,val,To.child(consumerTopic)); 
            } // forward same message to list of topics ( 1..n topics) , rollback if write to some topics failed ? 
        }
    }
}

场景:我们正在从源主题和流中读取数据 处理器将数据写入多个接收器主题(上面的主题列表)。

问题:如何使用kafka流实现回滚机制 处理器 api 当上面的 topicList 中的一个或多个主题时 收不到消息? .

据我所知,处理器 api 对每个都有回滚机制 记录它发送失败,或者可以回滚一整批 失败的消息也能实现?作为工艺方法 处理器接口是按记录而不是按批次调用的,因此我 会猜测它只能按记录完成。这是正确的假设吗?,如果不是,请建议 如何使用处理器 api 实现每条记录和每批失败主题的回滚。

【问题讨论】:

  • 您是否尝试向此处理器添加多个接收器并使用To.all()context.forward(key, val, To.all());
  • @JavaTechnical,谢谢,是的,我有,但是如果消息传递到某个主题失败(如果主题由于某种原因不可用),它与回滚机制有什么关系。此外,我想在失败的情况下将数据记录到错误主题,我猜 context.forward 到非接收节点已被贬低..所以我猜错误主题必须是接收节点,对吗?在那种情况下,我可能不想使用 To.all ,如何通过它?

标签: apache-kafka apache-kafka-streams


【解决方案1】:

您需要自己实现它。例如,您可以使用两个存储:主存储和“缓冲”存储,首先只更新缓冲存储,然后调用context.forward() 确保所有写入都在输出主题中,然后合并“缓冲”存储进入总店。

如果需要回滚,则从缓冲存储中删除内容。

【讨论】:

    猜你喜欢
    • 2019-10-11
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-07-07
    • 1970-01-01
    • 1970-01-01
    • 2018-12-27
    • 2017-10-07
    相关资源
    最近更新 更多