kafka→分布式发布-订阅消息系统

一、kafka简介

1.kafka是什么?

  • 最初由 LinkedIn 公司开发,使用 Scala语言编写,之后成为 Apache 项目的一部分。
  • 是一个分布式的,可划分的,多订阅者,冗余备份的持久性的服务。
  • 同时为发布和订阅提供高吞吐量。每秒可以生产约 25 万消息(50 MB),每秒处理 55 万消息(110 MB)。
  • kafka的优点
  • 降低系统组网复杂度。
  • 降低编程复杂度,各个子系统不再是相互协商接口,各个子系统类似插口插在插座上,Kafka 承担高速数据总线的作用。
  • 分布式系统,易于向外扩展。所有的 producer、broker 和 consumer 都会有多个,均为分布式的。无需停机即可扩展机器。
  • 可进行持久化操作。将消息持久化到磁盘,可以防止数据丢失。

2.kafka核心概念

  • 消息的发布(publish)称做producer。
  • 消息的订阅(subscribe)称做consumer。
  • 中间的存储阵列称做broker。
  • 多个broker协同合作,producer、consumer和broker三者之间通过zookeeper来协调请求和转发。
  • producer产生和推送(push)数据到broker,consumer从broker拉取(pull)数据并进行处理。

3.kafka的应用场景?

  • 处理活跃的流式数据。
  • 同时处理在线应用(消息)和离线应用(数据文件,日志)。
  • 大数据系统中的数据流转。

4.其他消息队列介绍

  • RabbitMQ:支持的协议多,非常重量级消息队列,对路由(Routing),负载均衡(Load balance)或者数据持久化都有很好的支持。
  • ZeroMQ:号称最快的消息队列系统,尤其针对大吞吐量的需求场景,擅长的高级/复杂的队列,但是技术也复杂,并且只提供非持久性的队列。
  • ActiveMQ:Apache下的一个子项,类似ZeroMQ,能够以代理人和点对点的技术实现队列。
  • Redis:是一个key-Value的NOSql数据库,但也支持MQ功能,数据量较小,性能优于RabbitMQ,数据超过10K就慢的无法忍受。

5.kafka版本下载

  • http://kafka.apache.org/downloads
  • 下载最新版本稳定版的安装包,如下图所示。

CenOS7下kafka集群搭建

二、kafka安装

1.软件环境

  • CentOS7
  • JDK1.8
  • kafka10.1.1

2.安装准备

安装zookeeper单机版并启动

3.kafka安装和启动

  • 输入 mkdir /usr/local/kafka建立kafka目录
  • CenOS7下kafka集群搭建
  • 将下载好的tar包放到该目录
  • CenOS7下kafka集群搭建
  • 上传tar包到 /usr/local/kafka 并解压,进入解压后的目录

 CenOS7下kafka集群搭建

  • 自定义一个放kafka日志的目录,例如我这里在kafka解压目录下新建一个log目录

输入mkdir /usr/local/kafka/log

   CenOS7下kafka集群搭建

  • 输入cp /usr/local/kafka/kafka_2.11-0.10.1.1/config/server.properties /usr/local/kafka/kafka_2.11-0.10.1.1/config/server.properties.bak
  • vi  /usr/local/kafka/kafka_2.11-0.10.1.1/config/server.properties

输入以下内容:

# broker的全局唯一编号,不能重复

broker.id=0

# 用来监听链接的端口,producer或consumer将在此端口建立连接

port=9092

# 处理网络请求的线程数量

num.network.threads=3

# 用来处理磁盘IO的线程数量

num.io.threads=8

# 发送套接字的缓冲区大小

socket.send.buffer.bytes=102400

# 接收套接字的缓冲区大小

socket.receive.buffer.bytes=102400

# 请求套接字的缓冲区大小

socket.request.max.bytes=104857600

# kafka运行日志存放的路径

log.dirs=/usr/local/kafka/log

# topic在当前broker上的分片个数

num.partitions=2

# 用来恢复和清理data下数据的线程数量

num.recovery.threads.per.data.dir=1

# segment文件保留的最长时间,超时将被删除

log.retention.hours=168

# 滚动生产新的segment文件的最大时间

log.retention.hours=168

# zookeeper集群的地址,可以是多个,多个之间用逗号分割

zookeeper.connect=master:2181,node1:2181,node2:2181

#ZooKeeper的最大超时时间,就是心跳的间隔,若是没有反映,那么认为已经死了,不易过大zookeeper.connection.timeout.ms=6000

CenOS7下kafka集群搭建

  • 在master主机上,任意目录输入

scp -r /usr/local/kafka/ [email protected]:/usr/local/将文件分发到节点node1上

scp -r /usr/local/kafka/[email protected]:/usr/local/将文件分发到节点node2上

CenOS7下kafka集群搭建

  • 再次修改配置文件

依次修改各服务器上配置文件的的broker.id,分别是0,1,2不得重复。

  • 输入cp  /usr/local/kafka/kafka_2.11-0.10.1.1/config/server.properties /usr/local/kafka/kafka_2.11-0.10.1.1/config/server.properties.bak
  • vi  /usr/local/kafka/kafka_2.11-0.10.1.1/config/server.properties

输入以下内容:

再次修改配置文件

输入cd /usr/local/kafka/kafka_2.11-0.10.1.1/config/进入config目录下 vi  server.properties

依次修改各服务器上配置文件的的broker.id,分别是0,1,2不得重复。

CenOS7下kafka集群搭建

 

  • 依次在各节点上启动kafka

进入kafka的bin目录,cd /usr/local/kafka/kafka_2.11-0.10.1.1/bin/,输入 ./kafka-server-start.sh /usr/local/kafka/kafka_2.11-0.10.1.1/config/server.properties  启动kafka

  • 克隆一个会话,可输入 ps aux|grep kafka 查看进程

4.kafka测试

  • cd /usr/local/kafka/kafka_2.11-0.10.1.1/bin/
  • 进入kafka的bin目录,输入 ./kafka-console-producer.sh --broker-list master:9092 --topic test  运行producer
  • 再克隆一个会话,进入kafka的bin目录,输入 ./kafka-console-consumer.sh --zookeeper master:2181 --topic test --from-beginning  运行consumer
  • 在producer窗口输入一些内容,然后查看consumer窗口,可以查看到输入的内容,证明kafka连通性没问题

相关文章:

  • 2022-02-09
  • 2022-12-23
  • 2021-10-16
  • 2022-01-07
猜你喜欢
  • 2021-09-15
  • 2021-04-22
  • 2021-11-29
  • 2021-06-15
  • 2022-02-01
  • 2022-12-23
  • 2022-12-23
相关资源
相似解决方案