kafka的概念
kafka在设计之初的时候,开发人员们在除了消息中间件以外,还想吧kafka设计为一个能够存储数据的系统,有点像常见的非关系型数据库,比如说NoSql等。除此之外还希望kafka能支持持续变化,不断增长的数据流, 可以发布和订阅数据流,还可以对于这些数据进行保存。
也就是说kafka的本质是一个数据存储平台、流平台,只是他在做消息发布,消息消费的时候我们可以把他当做消息中间件来用。
而且kafka在设计之初就是采用分布式架构设计的,基于集群的方式工作,且可以自由伸缩,所以kafka构建集群非常简单。
kafka各个组件的概念
- Broker : 和AMQP里协议的概念一样, 就是消息中间件所在的服务器
- Topic(主题) : 每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)
- Partition(分区) : Partition是物理上的概念,体现在磁盘上面,每个Topic包含一个或多个Partition.
- Producer : 负责发布消息到Kafka broker
- Consumer : 消息消费者,向Kafka broker读取消息的客户端。
- Consumer Group(消费者群组) : 每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。
- offset 偏移量: 是kafka用来确定消息是否被消费过的标识,在kafka内部体现就是一个递增的数字
如果有消费者群组,一个消费者可以去消费多个分区的消息,但是具体的某一个分区只能被一个消费者消费
偏移量递增是基于分区的,每个分区的偏移量都是不一样的
kafka消息发送的时候,考虑到性能可以采用打包方式发送,也就是说传统的消息是一条一条发送,现在可以先把需要发送的消息缓存在客户端,等到达一定数值时,再一起打包发送,而且还可以对发送的数据进行压缩处理,减少在数据传输时的开销。
kafka的优缺点
优点:
- 基于磁盘的数据存储
- 高伸缩性
- 高性能
- 应用场景 : 收集指标和日志 提交日志 流处理
缺点:
- 运维难度大
- 偶尔有数据混乱的情况
- 对zookeeper强依赖
- 多副本模式下对带宽有一定要求
kafka安装和启动
kafka安装的话,直接 从官网下载压缩包下来解压就可以了,下载地址:https://kafka.apache.org/downloads
注意的是,启动kafka要先启动zookeeper,kafka默认自带了zookeeper,可以启动他自带的,也可以自己另外使用。
启动kafka需要执行kafka-server-start.bat文件,然后需要传入一个路径参数,就是你server.config文件的地址, 一般情况下传入../../config/server.properties即可
刚刚提到的zookeeper,kafka默认的zookeeper启动的话,启动zookeeper-server-start.bat文件即可,同样要传入路径参数:../../config/zookeeper.properties
//启动kafka自带的zookeeper
zookeeper-server-start.bat ../../config/zookeeper.properties
//启动kafka
kafka-server-start.bat ../../config/server.properties
kafka搭建集群的话也比较简单,直接把压缩包复制几份,然后更改config目录下的server.properties文件里几个参数即可
broker.id=0
listeners=PLAINTEXT://:9092
log.dirs=/tmp/kafka-logs
server.properties参数解释
log.dirs //日志文件存储地址,可以设置多个,多个之间用逗号隔开
num.recovery.threads.per.data.dir //用来读取日志文件的线程数量,对应每一个log.dirs若此参数为2,log.dirs为2个目录 那么就会有4个线程来读取
auto.create.topics.enable //是否自动创建tiopic
num.partitions // 创建topic的时候自动创建多少个分区 (可以在创建topic的时候手动指定)
log.retention.hours // 日志文件保留时间 超时即删除
log.retention.bytes // 日志文件最大大小
log.segment.bytes // 当日志文件达到一定大小时,开辟新的文件来存储(分片存储)
log.segment.ms // 同上 只是当达到一定时间时 开辟新的文件
message.max.bytes // broker能接收的最大消息大小(单条) 默认1M
kafka基本管理操作命令
//列出所有主题
kafka-topics.bat --zookeeper localhost:2181 --list
//列出所有主题的详细信息
kafka-topics.bat --zookeeper localhost:2181 --describe
//创建主题 主题名 my-topic,1副本,8分区
kafka-topics.bat --zookeeper localhost:2181 --create --replication-factor 1 --partitions 8 --topic my-topic
//增加分区,注意:分区无法被删除,只能删除主题
kafka-topics.bat --zookeeper localhost:2181 --alter --topic my-topic --partitions 16
//删除主题
kafka-topics.bat --zookeeper localhost:2181 --delete --topic my-topic
//列出消费者群组(仅Linux)
kafka-topics.sh --new-consumer --bootstrap-server localhost:9092 --list
//列出消费者群组详细信息(仅Linux)
kafka-topics.sh --new-consumer --bootstrap-server localhost:9092 --describe --group 群组名
kafka示例代码
pom.xml文件添加如下:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
</dependency>
生产者代码:
package com.luban.kafka.product;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class Producer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties properties=new Properties();
properties.setProperty("bootstrap.servers","127.0.0.1:9092");
properties.setProperty("key.serializer", StringSerializer.class.getName());
properties.setProperty("value.serializer", StringSerializer.class.getName());
KafkaProducer<String,String> kafkaProducer=new KafkaProducer<String, String>(properties);
ProducerRecord<String,String> record=new ProducerRecord<>("my-topic","test-key","hello kafka");
/**
* kafka可以同步接受和异步接受消息发送成功的返回,下面会有演示
*/
//同步接受发送消息的返回
Future<RecordMetadata> send = kafkaProducer.send(record);
RecordMetadata recordMetadata = send.get();
System.out.println(recordMetadata);
//异步接受消息发送成功的返回
kafkaProducer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (null != e){
System.out.println(e);
}
if(null != recordMetadata){
System.out.println(recordMetadata);
}
}
});
kafkaProducer.flush();
kafkaProducer.close();
}
}
消费者代码:
package com.luban.kafka.cousmer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Collections;
import java.util.Properties;
public class Cousmer {
public static void main(String[] args) {
Properties properties=new Properties();
properties.setProperty("bootstrap.servers","127.0.0.1:9092");
properties.setProperty("key.deserializer", StringDeserializer.class.getName());
properties.setProperty("value.deserializer", StringDeserializer.class.getName());
properties.setProperty("group.id","1111");
KafkaConsumer<String,String> kafkaConsumer=new KafkaConsumer<String, String>(properties);
kafkaConsumer.subscribe(Collections.singletonList("my-topic"));
while (true){
ConsumerRecords<String, String> poll = kafkaConsumer.poll(5000);
for (ConsumerRecord<String,String> consumerRecords :poll){
System.out.println(consumerRecords.partition()+"====" + consumerRecords.offset() + consumerRecords.value());
}
}
}
}