kafka

介绍

kafka是一个分布式数据发布订阅平台。

Kafka入门&介绍part1

特点

1. 发布/订阅:类似消息队列或企业消息总线
2. 存储数据:容错、可持久化
3. 处理数据:实时处理

Kafka入门&介绍part1

应用

消息系统
传统消息系统有两种模式:队列和发布订阅。队列就是消费者按序读取记录,发布订阅就是记录被广播至所有消费者。
这两种模式都有各自优缺点。队列的优点是允许记录分发在多个消费者,加快消息处理速度,但是消息不能并发读取,只能
一个一个来。发布订阅允许你广播消息至多个处理器,但是由于每个消息都传递给每个订阅者,因此无法扩展处理。

kafka的消费者组可以解决上面的问题。kafka的消费者组概念概括了这两个概念。与队列一样,使用者组允许将处理划分为一组
进程(使用者组的成员)。与发布-订阅一样,Kafka允许您向多个消费者组广播消息。kafka模型的优点是,每个主题都具有这些
属性——它可以扩展处理,而且是多租户。同时kafka比传统消息系统有更强的顺序保证。

概念

1. kafka作为集群可以运行在多个服务器
2. kafka集群存储记录目录的组件叫topics
3. 每条记录(record)包含key、value和timestamp

Kafka入门&介绍part1

核心API

Producer API: 允许应用发布记录流到一个或多个kafka主题
Consumer API: 允许应用订阅一个或多个主题,并处理记录流
Stream API: 允许应用作为流处理器,从主题消费数据流,然后往其他主题设置输出数据流
Connector API: 允许构建和运行可重用的生产者和消费者(如一个关系型数据库的connector可以监听表的每次改变)
AdminClient API: 管理和监控topic、broker和其他组件

Kafka入门&介绍part1

主题和日志

主题就是一个类别(或目录)记录哪些记录被发布。kafka主题是多订阅者,也即是一个主题可以有0个、1个或多个
消费者订阅主题的数据(即记录)。对于每个主题,kafka cluster维护一个分区日志。每个分区是一个有序的只能追加的有结构
的日志。分区的每条记录都分配一个序号(offset)来唯一标识。

kafka cluster持久化所有发布的记录(不管记录是否被消费),这些记录会设置保留期限。比如如果保留期限为2天,那么2天内记录
是可被消费的,2天后就被丢弃了(释放空间)。Kafka的性能相对于数据大小实际上是恒定的,因此长时间存储数据不成问题。

实际上每个消费者都是基于offset读取记录的,所以消费者可以控制消费记录的顺序(比如可以重置旧offset读前面或后面的记录)。
这些特性决定了消费者加入或退出不会影响集群中其他的消费者。

日志可以分区,不会限制于单台服务器的限制。一个主题可能有多个分区,所以它可以存储更多数据。

Kafka入门&介绍part1

分布式

在kafka cluster中日志分区分布在服务器中的,每个服务器都可能处理相同的分区,为了容错
分区可以在配置的服务器间复制。每个分区都有个leader服务器,然后有多个follower服务器。leader
处理所有的请求的读写,follower进行数据同步;如果leader挂了,follower会自动成为新的leader。每个服务器
是部分分区的leader,其他分区的follower,以实现均衡负载。

地理复制

kafka的MirrorMaker为集群提供了基于地理的复制,可以跨区域跨数据中心,也可以用来做备份和容灾。

生产者

生产者推送数据至主题,并且负责选择某条记录分配在哪个主题下面的分区。这可以实现轮询达到负载均衡,也可以根据record的
某些key进行分配至不同分区。

消费者

消费者会自动打标签进入一个消费者组,每个发布至主题的记录被传递至每个订阅消费者组的消费者。消费者实例可以分散至
不同处理器或不同机器。如果所有消费者实例都在同一消费者组,那么记录会在消费者实例间做负载均衡。如果所有消费者都
在不同的消费者组,那么每条记录会广播这所有消费者处理器。

Kafka入门&介绍part1

多租户

Kafka部署支持多租户。通过配置哪些主题可以启用多租户,还有对配额的操作支持。
管理员可以定义和执行请求的配额,以控制客户端使用的代理资源

保证

1. 生产者发送的消息会按发送顺序排在分区(生产者发送了M1,然后M2,那么M1在日志中有更低的offset)
2. 消费者按日志存储顺序读取记录
3. 如果一个主题在N个服务器复制,那么最多容忍N-1个服务挂掉

快速开始

启动服务

# 启动zookeeper
zookeeper-server-start.bat ..\..\config\zookeeper.properties

# 启动kafka
kafka-server-start.bat ..\..\config\server.properties

创建主题

# 创建主题
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 -partitions 1 --topic ktest

# 查看主题
kafka-topics.bat --list --zookeeper locaohost:2181

发送消息

kafka-console-producer.bat --broker-list localhost:9092 --topic ktest
> this is a kafka message
> this is another message

消费消息

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic ktest --from-beginning

集群模式

配置
复制config\server.proerpties 2份,分别改名为server2.proerperties和server3.properties

server2.properties
    broker.id = 2
    listeners = PLAINTEXT://:9093
    log.dirs = /tmp/kafka-logs-2
server3.properties
    broker.id = 3
    listeners = PLAINTEXT://:9094
    log.dirs = /tmp/kafka-logs-3
启动
kafka-server-start.bat ..\..\config\server2.properties
kafka-server-start.bat ..\..\config\server3.properties
其他操作
# 创建topic
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

# 查看topic状态
kafka-topics.bat --describe --zookeeper localhost:2181 --topic my-replicated-topic
kafka-topics.bat --describe --zookeeper localhost:2181 --topic ktest

通过查看topic状态可以发现topic是否复制到多节点,leader节点和follower节点等

应用场景

消息系统

kafka最常见就是应用与消息系统(解耦数据处理),相比于传统的消息系统(ActiveMQ和RabbitMQ),kafka有更高的吞吐量,
内置的分区,消息复制和容错性。一般来说消息系统的吞吐量不会很大,但是需要kafka提供强持久化来保证消息不丢失。

网站活动追踪

kafka也经常通过实时的发布订阅,重建用户活动轨迹pipeline。这意味着网页浏览、搜索和用户其他行为都会发布到中央主题,这些
可以满足实时监控,或者将数据导入hadoop出用户行为报表等。

数据统计

kafka经常用于数据统计,可以聚合来自分布式应用程序的统计数据,产生操作数据的集中式概要。

日志收集

许多开发用kafka来收集日志。日志收集通常就是收取物理日志文件然后将他们存放在一个中心系统(如文件系统、HDFS等)处理。kafka
提取日志文件的摘要,然后将摘要作为消息流发送和处理。这更好的支持低延迟处理和更容易支持多数据源、分布式数据处理。对比日志
处理系统(如Scribe、Flume),kafka提供相同的性能,更强的持久化,以及端对端更低延迟。

流式处理

kafka也可以以流式方式处理数据,原始数据由topic消费,然后聚合、增强以及转换为其他topic或者远程程序处理。比如:推荐的文章
从RSS获取,然后发布至""acticles"的主题,远端处理程序可以复制这些内容然后发布到新的主题,最终程序会将这些文章推荐给用户。
类似于上面的处理流程线可以为每一个主题创建实时的数据流。在0.10.0版本后kafka提供了kafka Streams(一个更轻量级但是更强大的
API包)。

事件源

事件源是基于应用的可以基于时间线的事件处理链,kafka支持存储很大数据级日志,可以支持它更好的做事件源处理。

提交日志

kafka也可以作为分布式系统的日志提交记录,这些日志记录可以作为结点间复制的根据,或者同步服务挂掉结点的数据等。

kafka生态

kafka生态包括流式处理系统、Hadoop集成、监控和发布等工具,可参考https://cwiki.apache.org/confluence/display/KAFKA/Ecosystem

配置

broker配置

- 核心重要配置
broker.id: 唯一标识
log.dirs: 日志存放路径
zookeeper.coonect: zookeeper地址

更多配置见http://kafka.apache.org/documentation/#brokerconfigs

broker配置更新

从1.1开始,kafka broker的部分配置可以动态修改(不重启)

- 配置中各属性含义
read-only: 需要重启更新
pre-broker: 可以在每个broker动态更新
cluster-wide: 集群更新
-- 修改broker配置
kafka-configs.bat --bootstrap-server localhost:9093 --entity-type brokers --entity-name 1 --alter --add-config log.cleaner.threads=10

-- 查看broker配置
kafka-configs.bat --bootstrap-server localhost:9093 --entity-type brokers --entity-name 2 --describe
kafka-configs.bat --bootstrap-server localhost:9093 --entity-type brokers --entity-default --describe

-- 删除broker配置
kafka-configs.bat --bootstrap-server localhost:9093 --entity-type brokers --entity-name 2 --alter --delete-config log.cleaner.threads

-- 新增broker配置
kafka-configs.bat --bootstrap-server localhost:9093 --entity-type brokers --entity-name 2 --alter --add-config log.cleaner.threads=2

动态的配置(pre-broker和cluster-wide)存放在zookeeper,静态配置存储在server.properties

密码配置
password.encoder.secret
SSL配置
ssl.keystore.type
ssl.keystore.location
ssl.keystore.password
ssl.key.password

ssl.truststore.type
ssl.truststore.location
ssl.truststore.password
默认主题配置
log.segment.bytes
log.roll.ms
log.roll.hours
log.roll.jitter.ms
log.roll.jitter.hours
log.index.size.max.bytes
log.flush.interval.messages
log.flush.interval.ms
log.retention.bytes
log.retention.ms
log.retention.minutes
log.retention.hours
log.index.interval.bytes
log.cleaner.delete.retention.ms
log.cleaner.min.compaction.lag.ms
log.cleaner.min.cleanable.ratio
log.cleanup.policy
log.segment.delete.delay.ms
unclean.leader.election.enable
min.insync.replicas
max.message.bytes
compression.type
log.preallocate
log.message.timestamp.type
log.message.timestamp.difference.max.ms
日志配置
log.cleaner.threads
log.cleaner.io.max.bytes.per.second
log.cleaner.dedupe.buffer.size
log.cleaner.io.buffer.size
log.cleaner.io.buffer.load.factor
log.cleaner.backoff.ms
更新线程配置
num.network.threads
num.io.threads
num.replica.fetchers
num.recovery.threads.per.data.dir
log.cleaner.threads
background.threads
连接配置
max.connections.per.ip
max.connections.per.ip.overrides
监听器配置
listeners
advertised.listeners
listener.security.protocol.map

inter.broker.listener.name
inter.broker.security.protocol

topic配置

-- 新增topic配置
kafka-topics.bat --zookeeper localhost:2181 --create --topic my-topic --partitions 1 --replication-factor 1 --config max.message.bytes=64000 --config flus.messages=1

-- 修改topic配置
kafka-configs.bat --zookeeper localhost:2181 --entity-type topics --entity-name my-topic --alter --add-config max.message.btyes=128000

-- 更新topic配置
kafka-configs.bat --zookeeper localhost:2181 --entity-type topics --entity-name my-topic --alter --add-config max.message.bytes=128000

-- 查看topic配置
kafka-configs.bat --zookeeper localhost:2181 --entity-type topics --entity-name my-topic --describe

-- 删除topic配置
kafka-configs.bat --zookeeper localhost:2181 --entity-type topics --entity-name my-topic --alter --delete-config max.message.bytes

更多配置见http://kafka.apache.org/documentation/#topicconfigs

producer配置

详见http://kafka.apache.org/documentation/#producerconfigs

consumer配置

详见http://kafka.apache.org/documentation/#consumerconfigs

connect配置

详见http://kafka.apache.org/documentation/#connectconfigs

streams配置

详见http://kafka.apache.org/documentation/#streamsconfigs

adminClient配置

详见http://kafka.apache.org/documentation/#adminclientconfigs

相关文章: