1.eclipse运行消费者代码。代码如下
1 package cn.test.mykafka; 2 3 import java.util.Arrays; 4 import java.util.Properties; 5 6 import org.apache.kafka.clients.consumer.ConsumerRecord; 7 import org.apache.kafka.clients.consumer.ConsumerRecords; 8 import org.apache.kafka.clients.consumer.KafkaConsumer; 9 10 11 /** 12 * 简单消费者 13 * 14 */ 15 16 public class SimpleConsumer { 17 18 public static void main(String[] args) { 19 20 Properties props = new Properties(); 21 props.put("bootstrap.servers", "192.168.42.133:9092"); 22 props.put("group.id", "group1"); 23 props.put("enable.auto.commit", "true"); 24 props.put("auto.commit.interval.ms", "1000"); 25 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 26 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 27 28 @SuppressWarnings("resource") 29 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); 30 consumer.subscribe(Arrays.asList("test-topic")); //订阅主题 31 32 while (true) { 33 @SuppressWarnings("deprecation") 34 ConsumerRecords<String, String> records = consumer.poll(100); 35 for (ConsumerRecord<String, String> record : records) 36 System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); 37 } 38 } 39 }