基本思路:在kafka生产者生产消息时,把相同userId的消息落在同一个分区/partition

 

	public void sendTopic1(String tpoic, String userId, String message) {
		Properties props = new Properties();
        //集群地址,多个服务器用","分隔
        props.put("bootstrap.servers", servers);
        //key、value的序列化,此处以字符串为例,使用kafka已有的序列化类
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("request.required.acks", "all");
        //创建生产者
        int partitionNum = 0;
        if (StringUtils.isBlank(userId)) { //之前介绍过 Key 是可以传空值的
            partitionNum = new Random().nextInt(11);   //随机
        } else {
            //取 %
            partitionNum = Math.abs((userId.hashCode()) % 11);
        }
        log.info("发送topic的partition索引:{}", partitionNum);
        Producer<String, String> producer = new KafkaProducer<>(props);
        ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(tpoic, partitionNum, userId, message);
        producer.send(producerRecord);
        producer.close();
	}

  

相关文章:

  • 2022-02-26
  • 2022-12-23
  • 2022-12-23
  • 2021-10-05
  • 2021-10-09
  • 2021-08-11
  • 2021-05-22
  • 2021-10-11
猜你喜欢
  • 2021-08-23
  • 2021-06-02
  • 2021-08-30
  • 2022-12-23
  • 2021-09-23
  • 2022-01-14
  • 2022-12-23
相关资源
相似解决方案