【问题标题】:processes all pre processed records again from Kafka when restart Storm重新启动 Storm 时再次处理来自 Kafka 的所有预处理记录
【发布时间】:2019-12-30 02:36:30
【问题描述】:

我正在从 Kafka 消费者读取数据到 Storm spout。但是,当我重新启动 Storm 时,它还会从 Kafka 读取先前处理过的记录。 重新启动时,我不想处理以前处理的记录。 这是我的代码:

public class KafkaStormSample {
    public static void main(String[] args) throws Exception {

        SpoutConfig kafkaSpoutConfig = new SpoutConfig(hosts, topic, "/" + topic, UUID.randomUUID().toString());
        kafkaSpoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("kafka-spout", new KafkaSpout(kafkaSpoutConfig));
        builder.setBolt("word-spitter", new SplitBolt()).shuffleGrouping("kafka-spout");

        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("KafkaStormSample", config, builder.createTopology());
    }
}

【问题讨论】:

    标签: java apache-kafka apache-storm


    【解决方案1】:

    问题在于您用于 SpoutConfig 的随机 UUID。相反,选择一个固定的字符串并每次都使用它。

    无关:您不应该使用storm-kafka 编写新代码。请改用storm-kafka-client

    【讨论】:

    • 我尝试使用固定字符串,但没有成功。它仍然处理所有预处理记录。是否与LocalCluster 相关,因为一旦服务重新启动,它会处理以前的记录?
    • 不要将 LocalCluster 用于生产工作负载。是的,这是因为 LocalCluster 使用嵌入式 Zookeeper 运行,所以当您重新启动集群时,所有状态都会被擦除。
    • 是的,问题出在LocalCluster。现在对于生产环境,我使用的是StormSubmitter,它完美地维护了记录状态。
    【解决方案2】:

    除了静态 UUID,您还可以使用 StormSubmitter 提交拓扑以在 Storm 集群上运行。更多信息here

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-01-25
      • 2015-05-06
      • 2020-10-31
      • 2021-10-11
      • 1970-01-01
      • 2020-02-12
      相关资源
      最近更新 更多