【问题标题】:Empty data while reading data from kafka using Trident Topology使用 Trident Topology 从 kafka 读取数据时清空数据
【发布时间】:2015-10-24 18:49:20
【问题描述】:

我是 Trident 的新手。我正在编写一个从 kafka 读取数据的三叉戟拓扑。主题名称是“测试”。我有本地卡夫卡设置。我在本地启动了 zookeeper,kafka。并在 kafka 中创建了一个主题“测试”并打开了生产者并输入了消息“Hello Kafka!”。

我想使用 trident 从“测试”主题中读取消息“Hello Kafka”。

下面是我的代码。我得到了空元组。

    TridentTopology topology = new TridentTopology();
    BrokerHosts brokerHosts = new ZkHosts("localhost:2181");

    TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(brokerHosts, "test");
    kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
    kafkaConfig.bufferSizeBytes = 1024 * 1024 * 4;
    kafkaConfig.fetchSizeBytes = 1024 * 1024 * 4;
    kafkaConfig.forceFromStart = false;
    OpaqueTridentKafkaSpout opaqueTridentKafkaSpout = new OpaqueTridentKafkaSpout(kafkaConfig);

    topology.newStream("TestSpout", opaqueTridentKafkaSpout).parallelismHint(1)
      .each(new Fields(), new TestFilter()).parallelismHint(1)
      .each(new Fields(), new Utils.PrintFilter());

这是我的 TestFilter 类代码

public TestFilter()
{
    //
}

@Override
public boolean isKeep(TridentTuple tuple) {
    boolean isKeep=true;
    System.out.println("TestFilter is called...");
    if (tuple != null && tuple.getValues().size()>0) {
        System.out.println("data from kafka ::: "+tuple.getValues());
    } 
    return isKeep;
}

每当我在 kafka 生产者中向“测试”主题键入消息时,首先打印 sysout,但它没有通过 if 循环。我只是收到消息'TestFilter 被调用......'仅此而已。

我想将我生成的实际数据用于“测试”主题。怎么样?

【问题讨论】:

  • 你能看到使用控制台消费者脚本的消息吗?
  • 是的,我可以使用控制台消费者脚本看到该消息。
  • 可以改一下config.forceFromStart=true
  • 我添加了新字段(“str”)。它开始工作了。谢谢。
  • @Kutty 您应该为您自己的问题写一个答案以供后代使用。我遇到了同样的问题,但问题的根源不是很明显。

标签: apache-kafka trident


【解决方案1】:

问题在于 Stream.each 的参数。该方法的 javadoc 的相关部分是:

each(Fields inputFields, Filter filter)

文档不太清楚,但语义是您应该使用 inputFields 参数指定过滤器使用的所有字段。

Storm 然后将在输入元组上应用投影并将其转发给 过滤器

鉴于您没有指定任何输入字段,投影会产生一个空元组,从而导致过滤器内的tuple.getValues().size()>0 条件失败。

值得一提的是每个的其他变体:

each(Fields inputFields, Function function, Fields functionFields)
each(Function function, Fields functionFields)

这些会将提供的 function 应用于输入元组的投影,将结果元组附加到原始输入元组,将新字段重命名为 functionFields(即投影仅用于应用函数)。

特别是第二个版本等效于在 inputFields 设置为 null(或 new Fields())的情况下调用每个,并将导致将空元组传递给 function

【讨论】:

    猜你喜欢
    • 2016-03-23
    • 2017-01-26
    • 2016-11-08
    • 2015-05-20
    • 2018-06-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多