环境准备:

1)需要在maven工程中引入依赖:

 1  <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
 2     <dependency>
 3       <groupId>org.apache.kafka</groupId>
 4       <artifactId>kafka_2.12</artifactId>
 5       <version>1.1.0</version>
 6     </dependency>
 7     <dependency>
 8       <groupId> org.apache.cassandra</groupId>
 9       <artifactId>cassandra-all</artifactId>
10       <version>0.8.1</version>
11 
12       <exclusions>
13         <exclusion>
14           <groupId>org.slf4j</groupId>
15           <artifactId>slf4j-log4j12</artifactId>
16         </exclusion>
17         <exclusion>
18           <groupId>log4j</groupId>
19           <artifactId>log4j</artifactId>
20         </exclusion>
21       </exclusions>
22 
23     </dependency>

2)本机是否能telnet 192.178.0.111 9092(kafaka所部署的vmw虚拟机)通? 如果telnet端口不通,则需要关闭192.178.0.111的防火墙:

systemctl stop firewalld.service #停止firewall
systemctl disable firewalld.service #禁止firewall开机启动

一、生产者

首先看以下两种实现示例:

package com.dx;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.*;

import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.Date;

public class ProducerTest {
    public static void main(String[] args) {
        producer_test1(args);

        producer_test2();
    }

    private static void producer_test2() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.178.0.111:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        for(int i = 0; i < 10; i++)
            producer.send(new ProducerRecord<String, String>("kafakatopic", Integer.toString(i), Integer.toString(i)));

        producer.close();
    }

    private static void producer_test1(String[] args) {
        String arg0 = args != null && args.length > 0 ? args[0] : "10";
        long events = Long.parseLong(arg0);
        Random rnd = new Random();

        //    /opt/kafka_2.12-1.1.0/bin/kafka-console-producer.sh --broker-list 192.178.0.111:9092 --sync --topic kafkatopic
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.178.0.111:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // 配置partitionner选择策略,可选配置
        props.put("partitioner.class", "com.dx.SimplePartitioner2");

        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        for (long nEvents = 0; nEvents < events; nEvents++) {
            long runtime = new Date().getTime();
            String ip = "192.178.0." + rnd.nextInt(255);
            String msg = runtime + ",www.example.com," + ip;
            ProducerRecord<String, String> data = new ProducerRecord<String, String>("kafakatopic", ip, msg);
            Future<RecordMetadata> send = producer.send(data,
                    new Callback() {
                        public void onCompletion(RecordMetadata metadata, Exception e) {
                            if (e != null) {
                                e.printStackTrace();
                            } else {
                                System.out.println("The offset of the record we just sent is: " + metadata.offset());
                            }
                        }
                    });
        }
        producer.close();
    }
}
SimplePartitioner2.java
 1 package com.dx;
 2 
 3 import java.util.List;
 4 import java.util.Map;
 5 
 6 import org.apache.kafka.clients.producer.Partitioner;
 7 import org.apache.kafka.common.Cluster;
 8 import org.apache.kafka.common.PartitionInfo;
 9 
10 public class SimplePartitioner2 implements Partitioner {
11     public void configure(Map<String, ?> map) {
12     }
13 
14     public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
15         int partition = 0;
16         List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
17         int numPartitions = partitions.size();
18         String stringKey = (String) key;
19         int offset = stringKey.lastIndexOf('.');
20         if (offset > 0) {
21             partition = Integer.parseInt(stringKey.substring(offset + 1)) % numPartitions;
22         }
23 
24         return partition;
25     }
26 
27     public void close() {
28     }
29 }
View Code

相关文章: