【发布时间】:2015-10-24 18:49:20
【问题描述】:
我是 Trident 的新手。我正在编写一个从 kafka 读取数据的三叉戟拓扑。主题名称是“测试”。我有本地卡夫卡设置。我在本地启动了 zookeeper,kafka。并在 kafka 中创建了一个主题“测试”并打开了生产者并输入了消息“Hello Kafka!”。
我想使用 trident 从“测试”主题中读取消息“Hello Kafka”。
下面是我的代码。我得到了空元组。
TridentTopology topology = new TridentTopology();
BrokerHosts brokerHosts = new ZkHosts("localhost:2181");
TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(brokerHosts, "test");
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
kafkaConfig.bufferSizeBytes = 1024 * 1024 * 4;
kafkaConfig.fetchSizeBytes = 1024 * 1024 * 4;
kafkaConfig.forceFromStart = false;
OpaqueTridentKafkaSpout opaqueTridentKafkaSpout = new OpaqueTridentKafkaSpout(kafkaConfig);
topology.newStream("TestSpout", opaqueTridentKafkaSpout).parallelismHint(1)
.each(new Fields(), new TestFilter()).parallelismHint(1)
.each(new Fields(), new Utils.PrintFilter());
这是我的 TestFilter 类代码
public TestFilter()
{
//
}
@Override
public boolean isKeep(TridentTuple tuple) {
boolean isKeep=true;
System.out.println("TestFilter is called...");
if (tuple != null && tuple.getValues().size()>0) {
System.out.println("data from kafka ::: "+tuple.getValues());
}
return isKeep;
}
每当我在 kafka 生产者中向“测试”主题键入消息时,首先打印 sysout,但它没有通过 if 循环。我只是收到消息'TestFilter 被调用......'仅此而已。
我想将我生成的实际数据用于“测试”主题。怎么样?
【问题讨论】:
-
你能看到使用控制台消费者脚本的消息吗?
-
是的,我可以使用控制台消费者脚本看到该消息。
-
可以改一下
config.forceFromStart=true -
我添加了新字段(“str”)。它开始工作了。谢谢。
-
@Kutty 您应该为您自己的问题写一个答案以供后代使用。我遇到了同样的问题,但问题的根源不是很明显。
标签: apache-kafka trident