【发布时间】:2018-07-27 08:00:31
【问题描述】:
我有一个像下面这样的简单 java 生产者
public class Producer
{
private final static String TOPIC = "my-example-topi8";
private final static String BOOTSTRAP_SERVERS = "localhost:8092";
public static void main( String[] args ) throws Exception {
Producer<String, byte[]> producer = createProducer();
for(int i=0;i<3000;i++) {
String msg = "Test Message-" + i;
final ProducerRecord<String, byte[]> record = new ProducerRecord<String, byte[]>(TOPIC, "key" + i, msg.getBytes());
producer.send(record).get();
System.out.println("Sent message " + msg);
}
producer.close();
}
private static Producer<String, byte[]> createProducer() {
Properties props = new Properties();
props.put("metadata.broker.list", BOOTSTRAP_SERVERS);
props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
props.put("client.id", "AppFromJava");
props.put("serializer.class", "kafka.serializer.DefaultEncoder");
props.put("key.serializer.class", "kafka.serializer.StringEncoder");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("compression.codec", "snappy");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
return new KafkaProducer<String, byte[]>(props);
}
}
我正在尝试读取如下数据
public class Consumer
{
private final static String TOPIC = "my-example-topi8";
private final static String BOOTSTRAP_SERVERS = "localhost:8092";
public static void main( String[] args ) throws Exception {
Consumer<String, byte[]> consumer = createConsumer();
start(consumer);
}
static void start(Consumer<String, byte[]> consumer) throws InterruptedException {
final int giveUp = 10;
int noRecordsCount = 0;
int stopCount = 1000;
while (true) {
final ConsumerRecords<String, byte[]> consumerRecords = consumer.poll(1000);
if (consumerRecords.count()==0) {
noRecordsCount++;
if (noRecordsCount > giveUp) break;
else continue;
}
consumerRecords.forEach(record -> {
// Process the record System.out.printf("\nConsumer Record:(%s, %s, %s)", record.key(), new String(record.value()), record.topic());
});
consumer.commitSync();
break;
}
consumer.close();
System.out.println("DONE");
}
private static Consumer<String, byte[]> createConsumer() {
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG,
"KafkaExampleConsumer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "1234");
props.put("enable.auto.commit", "false");
// Create the consumer using props.
final Consumer<String, byte[]> consumer = new KafkaConsumer(props);
consumer.subscribe(Collections.singletonList(TOPIC));
return consumer;
}
}
但是消费者没有从 kafka 读取任何消息。如果我在start()
consumer.poll(0);
consumer.seekToBeginning(consumer.assignment());
然后消费者开始阅读主题。但是每次重新启动消费者时,它都会从我不想要的主题的开头读取消息。如果我在启动 Consumer 时添加以下配置
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
然后它从主题中读取消息,但如果消费者在处理所有消息之前重新启动,则它不会读取未处理的消息。
谁能告诉我出了什么问题,我该如何解决这个问题?
Kafka broker 和 zookeeper 使用默认配置运行。
【问题讨论】:
-
如果设置
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"),是消费者读取一整批消息的问题,它在停止之前只处理部分消息,重新启动它会跳过读取的整批消息前?如果是这种情况,那么问题可能是所有读取消息后的偏移量都被自动提交。您可能希望禁用自动提交并提交实际处理的偏移量,即使读取了更多消息或减少消费者将读取的最大批量大小。 -
在我发布在
createConsumer()的代码中,我正在设置这个props.put("enable.auto.commit", "false");我遇到的问题是假设有5000 条消息。然后假设消费者在通过 commitSync 提交此批次后,收到了 1000 条消息。如果消费者重新启动,那么我看不到消费者从 1001 开始收到任何消息。如果我不清楚我想问什么,请告诉我。 -
好的。那我的评论就没有实际意义了。
-
我面临错误:- java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.Consumer.subscribe(Ljava/util/Collection;)V
标签: java apache-kafka kafka-consumer-api kafka-producer-api