【发布时间】:2019-12-30 02:36:30
【问题描述】:
我正在从 Kafka 消费者读取数据到 Storm spout。但是,当我重新启动 Storm 时,它还会从 Kafka 读取先前处理过的记录。 重新启动时,我不想处理以前处理的记录。 这是我的代码:
public class KafkaStormSample {
public static void main(String[] args) throws Exception {
SpoutConfig kafkaSpoutConfig = new SpoutConfig(hosts, topic, "/" + topic, UUID.randomUUID().toString());
kafkaSpoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka-spout", new KafkaSpout(kafkaSpoutConfig));
builder.setBolt("word-spitter", new SplitBolt()).shuffleGrouping("kafka-spout");
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("KafkaStormSample", config, builder.createTopology());
}
}
【问题讨论】:
标签: java apache-kafka apache-storm