环境准备:
1)需要在maven工程中引入依赖:
1 <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka --> 2 <dependency> 3 <groupId>org.apache.kafka</groupId> 4 <artifactId>kafka_2.12</artifactId> 5 <version>1.1.0</version> 6 </dependency> 7 <dependency> 8 <groupId> org.apache.cassandra</groupId> 9 <artifactId>cassandra-all</artifactId> 10 <version>0.8.1</version> 11 12 <exclusions> 13 <exclusion> 14 <groupId>org.slf4j</groupId> 15 <artifactId>slf4j-log4j12</artifactId> 16 </exclusion> 17 <exclusion> 18 <groupId>log4j</groupId> 19 <artifactId>log4j</artifactId> 20 </exclusion> 21 </exclusions> 22 23 </dependency>
2)本机是否能telnet 192.178.0.111 9092(kafaka所部署的vmw虚拟机)通? 如果telnet端口不通,则需要关闭192.178.0.111的防火墙:
systemctl stop firewalld.service #停止firewall systemctl disable firewalld.service #禁止firewall开机启动
一、生产者
首先看以下两种实现示例:
package com.dx; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.*; import java.util.Properties; import java.util.Random; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.Date; public class ProducerTest { public static void main(String[] args) { producer_test1(args); producer_test2(); } private static void producer_test2() { Properties props = new Properties(); props.put("bootstrap.servers", "192.178.0.111:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<String, String>(props); for(int i = 0; i < 10; i++) producer.send(new ProducerRecord<String, String>("kafakatopic", Integer.toString(i), Integer.toString(i))); producer.close(); } private static void producer_test1(String[] args) { String arg0 = args != null && args.length > 0 ? args[0] : "10"; long events = Long.parseLong(arg0); Random rnd = new Random(); // /opt/kafka_2.12-1.1.0/bin/kafka-console-producer.sh --broker-list 192.178.0.111:9092 --sync --topic kafkatopic Properties props = new Properties(); props.put("bootstrap.servers", "192.178.0.111:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 配置partitionner选择策略,可选配置 props.put("partitioner.class", "com.dx.SimplePartitioner2"); Producer<String, String> producer = new KafkaProducer<String, String>(props); for (long nEvents = 0; nEvents < events; nEvents++) { long runtime = new Date().getTime(); String ip = "192.178.0." + rnd.nextInt(255); String msg = runtime + ",www.example.com," + ip; ProducerRecord<String, String> data = new ProducerRecord<String, String>("kafakatopic", ip, msg); Future<RecordMetadata> send = producer.send(data, new Callback() { public void onCompletion(RecordMetadata metadata, Exception e) { if (e != null) { e.printStackTrace(); } else { System.out.println("The offset of the record we just sent is: " + metadata.offset()); } } }); } producer.close(); } }
SimplePartitioner2.java
1 package com.dx; 2 3 import java.util.List; 4 import java.util.Map; 5 6 import org.apache.kafka.clients.producer.Partitioner; 7 import org.apache.kafka.common.Cluster; 8 import org.apache.kafka.common.PartitionInfo; 9 10 public class SimplePartitioner2 implements Partitioner { 11 public void configure(Map<String, ?> map) { 12 } 13 14 public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { 15 int partition = 0; 16 List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); 17 int numPartitions = partitions.size(); 18 String stringKey = (String) key; 19 int offset = stringKey.lastIndexOf('.'); 20 if (offset > 0) { 21 partition = Integer.parseInt(stringKey.substring(offset + 1)) % numPartitions; 22 } 23 24 return partition; 25 } 26 27 public void close() { 28 } 29 }