kafkautil:


import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.ProducerConfig;

import org.springframework.beans.factory.annotation.Value;

public class KafkaUtil {
    
    @Value("#{sys['connect']}")
    private static String zkConnect ;
    @Value("#{sys['metadata.broker.list']}")
    private static String brokerList;
    @Value("#{sys['request.required.acks']}")
    private static String ack;
    
    private static Producer<String, String> producer = null;
    
    /*static{
        Properties p = PropertiesUtil.getProperties("kafka.properties");
        zkConnect = (String) p.get("zk.connect");
        brokerList = (String) p.get("metadata.broker.list");
        ack = (String) p.get("request.required.acks");
        topic = (String) p.get("topic.imeidata");
    }
    */
    public static Producer<String,String> getProducer(){
        if(producer == null){
            Properties p = PropertiesUtil.getProperties("kafka.properties");
            zkConnect = (String) p.get("zk.connect");
            brokerList = (String) p.get("metadata.broker.list");
            ack = (String) p.get("request.required.acks");
            
            Properties props = new Properties();
            props.put("zk.connect", zkConnect);
            props.put("metadata.broker.list", brokerList);
            props.put("serializer.class", "kafka.serializer.StringEncoder");
            props.put("request.required.acks", ack);
            props.put("producer.type", "async");//是否同步 sync:同步   async:异步  
            props.put("partitioner.class", "com.kafka.SendPartitioner");//发送到多个分区进行分布式存储的分区算法类
            
            props.put("request.timeout.ms", "50000");
            props.put("queue.buffering.max.ms", "10000");//默认值5000  异步模式下,每隔此时间间隔会将缓冲的消息提交一次
            props.put("batch.num.messages", "1000");//默认值200  异步模式下,一次批量提交消息的条数,
                                                    //但如果间隔时间超过 queue.buffering.max.ms 的值,不管有没有达到批量提交的设值,都会进行一次提交
            ProducerConfig config = new ProducerConfig(props);
            producer = new Producer<String, String>(config);
        }
        return producer;
    }
   
}
View Code
kafka消息发送类的属性:
1:zk.connect:zk服务端连接地址

2:metadata.broker.list:zk客户端地址
3:serializer.class:kafka消息发送序列化格式

4:request.required.acks:是否确认消息消费机制 它有三个选项:1,0,-1
0,意味着producer永远不会等待一个来自broker的ack,这就是0.7版本的行为。这个选项提供了最低的延迟,但是持久化的保证是最弱的,当server挂掉的时候会丢失一些数据。经测试,每10K消息大约会丢几百条消息。

1,意味着在leader replica已经接收到数据后,producer会得到一个ack。这个选项提供了更好的持久性,因为在server确认请求成功处理后,client才会返回。如果刚写到leader上,还没来得及复制leader就挂了,那么消息才可能会 丢失。

     -1,意味着在所有的ISR都接收到数据后,producer才得到一个ack。这个选项提供了最好的持久性,只要还有一个replica存活,那么数据就不会丢失。经测试,100W条消息没有丢消息。

  

5:request.timeout.ms:请求超时
6:producer.type 是否同步 它有两个选项 sync:同步   async:异步  同步模式下,每发送一次消息完毕才会返回 在异步模式下,可以选择异步参数。
7:queue.buffering.max.ms:默认值5000  异步模式下,每隔此时间间隔会将缓冲的消息提交一次
8:batch.num.messages:默认值200  异步模式下,一次批量提交消息的条数,但如果间隔时间超过 queue.buffering.max.ms 的值,不管有没有达到批量提交的设值,都会进行一次提交
9:partitioner.class:自定义分区算法
在一个kafka集群中,每一个节点称为一个broker,所以进入zk通过/ls命令查看根目录有个brokers目录(kafka默认安装配置文件是放在zk根目录,我更喜欢入在自定义目录下),这里保存了当前kafka集群在正在运行的节点名:
  storm集成kafka
只有将所有消息最大限度平均的发送到每个broker上去,才能达到最好的集群效果。那么kafka是通过什么来保证这一点的呢。
kafka消息类KeyedMessae中有一个方法,参数分别为将要发送消息的队列,和消息KEY,VALUE。通过对KEY的HASH值求broker的个数求模,将会得到broker值,它就是将接收消息的节点。
可以自定义分区实现类,并在属性中指明:

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

public class SendPartitioner implements Partitioner{

    public SendPartitioner(VerifiableProperties verifiableProperties) {}

    @Override
    public int partition(Object key, int numPartitions) {
        try {
            return Math.abs(key.hashCode() % numPartitions);
        } catch (Exception e) {
            return Math.abs(key.hashCode() % numPartitions);
        }
    }

}
View Code

相关文章: