一 客户端

在上一篇博客创建的简单生产者的基础上,进行两个修改操作:

1.新建SimplePartitioner.java,修改返回分区为1。

kafka6 编写使用自定义分区的生产者

SimplePartitioner.java代码如下

package cn.test.mykafka;

import java.util.Map;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

/**
 * 简单分区函数
 *
 */

public class SimplePartitioner implements Partitioner {

    @Override
    public void configure(Map<String, ?> configs) {
        // TODO Auto-generated method stub

    }

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // TODO Auto-generated method stub
        return 1; //设置返回分区1
    }

    @Override
    public void close() {
        // TODO Auto-generated method stub

    }

}

2.复制SimpleProducer.java为PartitionerProducer.java,修改3处:

增加一个partitioner.class配置;主题改为test-topic2;消息改为hello world to partition 1 from win7 client。

 1 package cn.test.mykafka;
 2 
 3 import java.util.Properties;
 4 
 5 import org.apache.kafka.clients.producer.KafkaProducer;
 6 import org.apache.kafka.clients.producer.Producer;
 7 import org.apache.kafka.clients.producer.ProducerRecord;
 8 
 9 /**
10  * 使用自定义分区的生产者
11  *
12  */
13 
14 public class PartitionerProducer {
15 
16     public static void main(String[] args) {
17         
18          //创建配置信息
19          Properties props = new Properties();
20          props.put("bootstrap.servers", "192.168.42.133:9092"); //指定broker的节点和端口
21          props.put("acks", "all");
22          props.put("retries", 0);
23          props.put("batch.size", 16384);
24          props.put("linger.ms", 1);
25          props.put("buffer.memory", 33554432);
26          props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
27          props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
28          props.put("partitioner.class","cn.test.mykafka.SimplePartitioner"); //自定义分区
29          
30          //创建一个生产者
31          Producer<String, String> producer = new KafkaProducer<>(props);
32          
33          //发送消息
34          ProducerRecord<String, String> msg = new ProducerRecord<String, String>("test-topic2","hello world to partition 1 from win7 client");
35          producer.send(msg);
36          //for (int i = 0; i < 10; i++)
37          //   producer.send(new ProducerRecord<String, String>("test-topic", Integer.toString(i), Integer.toString(i))); //topic,key,value 
38             
39          System.out.println("over");
40          producer.close();
41     }
42 }
PartitionerProducer.java

相关文章:

  • 2021-09-04
  • 2021-12-06
  • 2022-12-23
  • 2022-01-16
  • 2021-05-13
  • 2021-01-07
猜你喜欢
  • 2022-12-23
  • 2022-12-23
  • 2021-04-28
  • 2022-12-23
  • 2022-12-23
  • 2021-08-20
相关资源
相似解决方案