【问题标题】:Storm Trident Topology missing tuples while placing them in HDFSStorm Trident 拓扑在将元组放入 HDFS 时丢失了元组
【发布时间】:2015-11-13 16:34:18
【问题描述】:

我正在运行一个风暴三叉戟拓扑,在两个不同的流中有两个不同的 spout。我的 spout 是 JMS spout,使用 HDFS State 来持久化元组。

如果我只运行一个 spout 就可以正常工作,我会将所有记录发布到 HDFS 中的 JMS 队列中。

在使用连接到两个不同队列的两个 spout 运行拓扑时,与我在 QUEUE 中发布的记录相比,我得到的记录更少。我在这里做错了什么吗。如果我这样做的方式有任何问题,请告诉我。

    TridentTopology topology = new TridentTopology();       
    topology.newStream("QueueSpout", TridentSpout).partitionPersist(tradeStateFactory,hdfsFields, new HdfsUpdater());       

    Stream TopicStream = topology.newStream("TopicSpout", TridentTopicSpout);
    TopicStream.each(hdfsFields, new CashFilter()).partitionPersist(cashStateFactory, hdfsFields, new HdfsUpdater());
    TopicStream.each(hdfsFields, new JournalFilter()).partitionPersist(journalStateFactory, hdfsFields, new HdfsUpdater());
    TopicStream.each(hdfsFields, new RcvdlvrFilter()).partitionPersist(rcvdlvrStateFactory, hdfsFields, new HdfsUpdater());

【问题讨论】:

    标签: hadoop jms hdfs apache-storm trident


    【解决方案1】:

    以下配置适用于拓扑。

    这是因为我没有使用分区分组。

    使用全局和批处理全局后,它工作正常。

    shuffle:使用随机循环算法在所有目标分区中均匀地重新分配元组

    广播:每个元组都复制到所有目标分区。这在 DRPC 期间很有用——例如,如果您需要对每个数据分区执行 stateQuery。

    partitionBy:partitionBy 接受一组字段,并根据该组字段进行语义分区。这些字段根据目标分区的数量进行散列和修改以选择目标分区。 partitionBy 保证同一组字段总是进入同一个目标分区。

    全局:所有元组都发送到同一个分区。为流中的所有批次选择相同的分区。

    batchGlobal:批处理中的所有元组都发送到同一个分区。流中的不同批次可能会进入不同的分区。

    partition:该方法接受一个自定义的分区函数,该函数实现了 backtype.storm.grouping.CustomStreamGrouping

    以下拓扑配置可以正常工作。

        TridentTopology topology = new TridentTopology();       
        topology.newStream("QueueSpout", TridentSpout).batchGlobal().partitionPersist(tradeStateFactory,hdfsFields, new HdfsUpdater());     
    
        Stream TopicStream = topology.newStream("TopicSpout",   TridentTopicSpout).global();
        TopicStream.each(hdfsFields, new CashFilter()).partitionPersist(cashStateFactory, hdfsFields, new HdfsUpdater());
        TopicStream.each(hdfsFields, new JournalFilter()).partitionPersist(journalStateFactory, hdfsFields, new HdfsUpdater());
        TopicStream.each(hdfsFields, new RcvdlvrFilter()).partitionPersist(rcvdlvrStateFactory, hdfsFields, new HdfsUpdater());
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-07-20
      • 1970-01-01
      • 1970-01-01
      • 2019-05-24
      相关资源
      最近更新 更多