【问题标题】:Print from Apache Storm Bolt从 Apache Storm Bolt 打印
【发布时间】:2015-12-03 05:57:18
【问题描述】:

我正在研究一些 Storm 拓扑和螺栓的示例代码,但我遇到了一些奇怪的事情。我的目标是使用 Storm 设置 Kafka,以便 Storm 可以处理 Kafka 总线上可用的消息。我定义了以下螺栓:

public class ReportBolt extends BaseRichBolt {

  private static final long serialVersionUID = 6102304822420418016L;

  private Map<String, Long> counts;
  private OutputCollector collector;

  @Override @SuppressWarnings("rawtypes")
  public void prepare(Map stormConf, TopologyContext context, OutputCollector outCollector) {
    collector = outCollector;
    counts = new HashMap<String, Long>();
  }

  @Override

  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    // terminal bolt = does not emit anything
  }

  @Override
  public void execute(Tuple tuple) {   
    System.out.println("HELLO " + tuple);
  }

  @Override
  public void cleanup() {
    System.out.println("HELLO FINAL");
  }
}

本质上,它应该只是输出每个 Kafka 消息;当调用清理函数时,应该会出现不同的消息。

我查看了工作日志,找到了最后一条消息(即“HELLO FINAL”),但找不到带有“HELLO”的 Kafka 消息。据我所知,这应该是一个简单的打印机螺栓,但我看不出哪里出错了。工作人员日志表明我已连接到 Kafka 总线(它获取偏移量等)。

简而言之,为什么我的println 没有出现在工作日志中?

编辑

public class AckedTopology {

  private static final String SPOUT_ID = "monitoring_test_spout";
  private static final String REPORT_BOLT_ID = "acking-report-bolt";
  private static final String TOPOLOGY_NAME = "monitoring-topology";

  public static void main(String[] args) throws Exception {
    int numSpoutExecutors = 1;

    KafkaSpout kspout = buildKafkaSpout();
    ReportBolt reportBolt = new ReportBolt();

    TopologyBuilder builder = new TopologyBuilder();

    builder.setSpout(SPOUT_ID, kspout, numSpoutExecutors);
    builder.setBolt(REPORT_BOLT_ID, reportBolt);

    Config cfg = new Config();
    StormSubmitter.submitTopology(TOPOLOGY_NAME, cfg, builder.createTopology());
  }

  private static KafkaSpout buildKafkaSpout() {
    String zkHostPort = "URL";
    String topic = "TOPIC";

    String zkRoot = "/brokers";
    String zkSpoutId = "monitoring_test_spout_id";
    ZkHosts zkHosts = new ZkHosts(zkHostPort);

    SpoutConfig spoutCfg = new SpoutConfig(zkHosts, topic, zkRoot, zkSpoutId);
    KafkaSpout kafkaSpout = new KafkaSpout(spoutCfg);
    return kafkaSpout;
  }
}

【问题讨论】:

  • 你为什么首先使用 println? Storm 应该使用 logback 来记录文件。此外,println 对延迟 (IO) 不利。
  • 分享你是如何构建拓扑的。它是否与 spout 正确链接?
  • @zenbeni 我只想检查一切是否正常。这绝不是最终的解决方案,只是一个健全的检查。

标签: java apache-storm


【解决方案1】:

您的螺栓没有与喷口相连。你需要使用storm的分组才能做到这一点..使用类似这样的东西

    builder.setBolt(REPORT_BOLT_ID, reportBolt).shuffleGrouping(SPOUT_ID);

setBolt 通常返回一个InputDeclarer 对象。在你的情况下,通过指定shuffleGrouping(SPOUT_ID),你告诉storm你有兴趣使用id为REPORT_BOLT_ID的组件发出的所有元组。

阅读更多stream groupings 并根据您的需要选择一个。

【讨论】:

    猜你喜欢
    • 2015-04-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多