Kafka旧版本producer由scala编写,0.9以后已经废除,但是很多公司还在使用0.9以前的版本,所以总结如下:
要注意包Producer是 kafka.javaapi.producer.Producer 这个才是java api使用的包

示例代码如下:

import kafka.producer.KeyedMessage;
import kafka.javaapi.producer.Producer;
import kafka.producer.ProducerConfig;
import java.util.Properties;

public class ProducerDemo {

	public static void main(String[] args) {

		Properties properties = new Properties();
		properties.put("metadata.broker.list", "kafka01:9092,kafka02:9092");
		properties.put("serializer.class", "kafka.serializer.StringEncoder");
		properties.put("request.requird.acks", "1");
		ProducerConfig config = new ProducerConfig(properties);
		Producer<String, String> producer = new Producer<String, String>(config);
		KeyedMessage<String,String> msg = new KeyedMessage<String,String>("topic","key","hello");
		producer.send(msg);
	}
	
}

自定义partition示例代码如下:

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

public class SimplePartitioner implements Partitioner {
    public SimplePartitioner (VerifiableProperties props) {

    }

    public int partition(Object key, int a_numPartitions) {
        int partition = 0;
        String stringKey = (String) key;
        int offset = stringKey.lastIndexOf('.');
        if (offset > 0) {
            partition = Integer.parseInt( stringKey.substring(offset+1)) % a_numPartitions;
        }
        return partition;
    }

}

大数据流动 专注于大数据实时计算,数据治理,数据可视化等技术分享与实践。
请在后台回复关键字下载相关资料。相关学习交流群已经成立,欢迎加入~

Kafka 0.8 Producer (0.9以前版本适用)

相关文章:

  • 2022-12-23
  • 2021-07-04
  • 2021-11-07
  • 2021-09-27
  • 2022-12-23
  • 2022-12-23
  • 2022-03-02
猜你喜欢
  • 2021-11-21
  • 2021-12-24
  • 2021-12-21
  • 2021-06-30
  • 2022-12-23
  • 2022-01-10
  • 2021-06-11
相关资源
相似解决方案