/*今晚整理了一点kafka的知识,很开心!
*当然要感谢教我这些知识的大神!*/
kafka分布式发布订阅消息系统
  • 数据的传输方式
1.socket
c/s交互模式,服务器提供服务,通过ip地址和客户端进行访问;客户机通过连接服务器指定的端口进行消息交互。其中传输协议可以是tcp/UDP 协议。而服务器约定了请求报文格式和响应报文格式。
优点: 1.易于编程,目前java提供了多种框架,屏蔽了底层通信细节以及数据传输转换细节。
           2.容易控制权限。通过传输层协议https,加密传输的数据,使得安全性提高 
           3.通用性比较强,无论客户端是.net架构,java,python 都可以。尤其是webservice规范,使得服务变得通用 
缺点: 1.服务器和客户端必须同时工作,当服务器端不可用的时候,整个数据交互是不可进行。 
           2.当传输数据量比较大的时候,严重占用网络带宽,可能导致连接超时。使得在数据量交互的时候,服务变的很不可靠。
2.ftp/文件共享服务器方式
系统A和系统B约定文件服务器地址,文件命名规则,文件内容格式等内容,通过上传文件到文件服务器进行数据交互。
优点:1.在数据量大的情况下,可以通过文件传输,不会超时,不占用网络带宽。
          2.方案简单,避免了网络传输,网络协议相关的概念。
 缺点: 1.不太适合做实时类的业务 
            2.必须有共同的文件服务器,文件服务器这里面存在风险。因为文件可能被篡改,删除,或者存在泄密等。
            3.必须约定文件数据的格式,当改变文件格式的时候,需要各个系统都同步做修改。

3.数据库共享数据方式
系统A和系统B通过连接同一个数据库服务器的同一张表进行数据交换。当系统A请求系统B处理数据的时候,系统A Insert一条数据,系统B select 系统A插入的数据进行处理。 
优点: 
    1.相比文件方式传输来说,因为使用的同一个数据库,交互更加简单。 
    2.由于数据库提供相当做的操作,比如更新,回滚等。交互方式比较灵活,而且通过数据库的事务机制,可以做成可靠性的数据交换。 
缺点: 
    1.当连接B的系统越来越多的时候,由于数据库的连接池是有限的,导致每个系统分配到的连接不会很多,当系统越来越多的时候,可能导致无可用的数据库连接。 
    2.一般情况,来自两个不同公司的系统,不太会开放自己的数据库给对方连接,因为这样会有安全性影响。

4.消息中间件方式
Java消息服务(Java Message Service)是message数据传输的典型的实现方式。系统A和系统B通过一个消息服务器进行数据交换。
系统A发送消息到消息服务器,如果系统B订阅系统A发送过来的消息,消息服务器会消息推送给B。双方约定消息格式即可。目前市场上有很多开源的jms消息中间件,比如 ActiveMQ, OpenJMS 。
 优点: 
    1.由于JMS定义了规范,有很多的开源的消息中间件可以选择,而且比较通用。接入起来相对也比较简单 
    2.通过消息方式比较灵活,可以采取同步,异步,可靠性的消息处理,消息中间件也可以独立出来部署。
缺点: 
    1.学习JMS相关的基础知识,消息中间件的具体配置,以及实现的细节对于开发人员来说还是有一点学习成本的 
    2.在大数据量的情况下,消息可能会产生积压,导致消息延迟,消息丢失,甚至消息中间件崩溃。
消息中间件的优势
系统解耦
削峰填谷
数据交换
异步通知
定时任务
  • ​二、消息中间件--Kafka
kafka--分布式消息系统01
/*
*Consumer(消费者)和broker都要在zookeeper注册*/
Kafka的特征
持久性消息 要从大数据得到有价值的信息,我们都承担不起丢失任何的信息。Apache Kafka采用O disk structure,提供稳定的 TB级消息存储。
高吞吐量 Kafka设计工作在商用硬件上,提供每秒百万的消息
分布式 Kafka支持在Kafka服务器上消息分区和整个集群上的分布式的消费,并且维护每个分区的排序。
多客户端支持 Kafka系统支持不同客户端的集成,包括Java、.NET、PHP、Ruby和Python
实时消息由生产者线程生成出来应该立刻被消费者线程看到,这个特征的关键是基于事件的系统例如Complex Event Processing(CEP)系统。
Broker 
    Kafka集群包含一个或多个服务器,这种服务器被称为broker 
Topic 
    每条发布到Kafka集群的消息都有一个类别,这个类别被称为topic(主题)。(每个消费者要指定主题才能拿到相应的消息
(物理上不同topic的消息分开存储,逻辑上 一个topic的消息虽然保存于一个或多个broker上但用户只需指定消息的topic即可生产或消费数据而不必关心数据存于何处)
/*可以看成一个队列queue,生产者发送消息到队列的不同地方,消费者指定不同的topic,取相应的消息*/
Partition 
    partition是物理上的概念,每个topic包含一个或多个partition,创建topic时可指定parition数量。每个partition 对应于一个文件夹,该文件夹下存储该partition的数据和索引文件 
Producer 
    负责发布消息到Kafka broker 
Consumer 
    消费消息。每个consumer属于一个特定的consumer group(可为每个consumer指定group name,若不指定 group name则属于默认的group)。
    使用consumer high level API时,同一topic的一条消息只能被同一个 consumer group内的一个consumer消费,但多个consumer group可同时消费这一消息。

kafka的设计基础
Producer发布消息到Kafka topic,topic由Kafka broker作为Kafka服务器创建。Consumer订阅Kafka主题获得消息。这个架构包括3个部分—producers, Kafka broker和consumer运行在不同的机器上。每个consumer是一个进程,这些进程被组织成consumer groups。
 Kafka的设计,任何消费后的消息的状态由消息的消费者维护,Kafka的broker不会维护是谁消费了这个消息。 
1.Kafka重要的基础是消息的缓存和存储是在文件系统上。在Kafka中,数据会立刻写入到OS内核页,缓存和刷新数据到磁盘上都是可以配置的。 
2.Kafka提供了更长的保留信息的时间,如果需要允许重新消费 
3.Kafka使用组消息减少网络开销 
4.不像是大多数消息系统,消费后的元数据保存在服务层,在Kafka中,消费后的消息状态保存在消费层。这也解决了如下问题:
    1).由于失败丢失消息;
    2).同一个消 息传递多次 
5.在Kafka中,生产者和消费者是传统的“推拉”模型,生产者推送消息到Kafka broker,然后消费者从broker拉去消息 
6.Kafka没有任何master的概念,对待所有的broker都是一样的。这一点有利于添加和删除Kafka broker,broker的元数据由Zookeeper维护,然后共享给生产者和消费者。 
7.基于ZooKeeper负载均衡允许生产者动态的发现broker。生产者维护broker连接的池,持续的通过ZooKeeper的回调方法更新。但是在0.8.x之后,负载均衡是通过Kafka元数据API来完成的,ZooKeeper只是定义可用的broker列表。 
8.生产者也选择同步或异步方式发送消息到broker
kafka的分区策略:
消息分区策略是在kafka的broker中进行的。消息怎么分区是由生产者决定的,broker以这些消息到达的顺序来存储。在Kafka的 broker中可以配置每个主题分区的个数。 Kafka复制是一个比较重要的特征。尽管Kafka是高扩展,较好的消息持久性和集群的高可用,复制保证了当broker是失效时消息一样可 以发布和消费。复制中,一个消息的每个分区都有n份,可以承受n-1次失效,其中一个副本作为leader。ZooKeeper保存着leader副本的信息和当前同步的副本(leader副本维护一个所有同步副本的列表)。 每个副本都存储着消息本地日志和偏移量,定期的同步到磁盘。这个过程也保证了要么消息写到所有的副本中,要么一个都没有写入。 如果leader副本失效,要么将消息写到本地日志,要么在发送消息到生产者确认之前,一个消息重新由生产者发送给新的leader broker。 选择新的leader副本的过程是这样的:所有的同步副本注册到ZooKeeper中。最前注册的副本会成为新lead副本,其他的副本会变成从副本(follower)。


Kafka提供了以下复制模式:
 同步复制:生产者首先从ZooKeeper识别leader副本,然后发布消息。在消息发布后,它会被写入到leader副本的日志中,然后所有的从副本(follower)通过一个通道开始拉取消息,消息的顺序是确定的。一旦消息写入到各自的日志中,每个从副本发送一个确认到leader副本。一旦复制完成,所有的确认都收到了,lead副本发送一个确认到生产者。在消费者端,所有消息的拉取(pull)都由leader副本完成。
异步复制:与同步复制唯一的不同是,leader副本不会等待从副本的确认。不好的一方面就是,broker失效的时候,不会保证消息的传送。


安装和部署
单节点--单broker集群
1.解压,配置环境变量
2.修改配置文件:
    zookeeper.properties:
        dataDir=/home/yanglei/kafka/zookeeper(zookeeper快照存储的数据文件夹)
        clientPort=2181(监听客户端请求的端口)
    启动kafka自带的zookeeper: bin/zookeeper-server-start.sh config/zookeeper.properties
  (启动一个本地的zookeeper实例)
3.新开启一个终端,修改配置文件:
    server.properties:
        broker.id=0(broker编号,每个必须是唯一整型值)
        log.dir=/home/yanglei/kafka/logs(存储日志文件)
        zookeeper.connect=localhost:2181(zookeeper连接)
    启动kafka服务:bin/kafka-server-start.sh config/server.properties
4.创建kafka主题,一个分区一个副本:
    kafka-topics.sh --zookeeper master:2181 --create --topic kafkatopic --partitions 1 --replication-factor 1
查看创建出来的主题的命令:bin/kafka-topics.sh --zookeeper master:2181 --describe --topic kafkatopic
5.新开启一个终端,启动生产者发送消息
    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafkatopic
    输出文字:This is single broker
Kafka提供命令行生产者客户端的方式接收输入,然后把他们作为消息发布 到Kafka集群。默认情况下,每一行都看成一个新的消息。
启动生产者命令行客户端,broker-list和topic参数是必须的。
broker-list指定要连接的broker<node_address:port>,这里是 localhost:9092。主题是kafkatopic,主题的名字是必须要有的,这样可以 将消息发送到指定的消费者组。

6.启动消费者获取消息
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafkatopic --from-beginning
在窗口可以看到消息


单节点--多broker集群
1.复制配置文件:
    cp server.properties config/server-1.properties 
    cp server.properties config/server-2.properties
    启动kafka自带的zookeeper:zookeeper-server-start.sh config/zookeeper.properties 
    修改配置文件:
    server-1.properties
        brokerid=1 
        port=9092 
        log.dir=/tmp/kafka8-logs/broker1
    server-2.propertie
        brokerid=2 
        port=9093 
        log.dir=/tmp/kafka8-logs/broker2
    
2.启动kafka服务
    kafka-server-start.sh config/server.properties 
    kafka-server-start.sh config/server1.properties 
    kafka-server-start.sh config/server2.properties
3.创建kafka主题,两个分区两个副本
    kafka-topics.sh –zookeeper localhost:2181 -partitions 2 --replication-factor 2 --create --topic othertopic
4.启动生产者发送消息
    kafka-console-producer.sh --broker-list localhost:9092,localhost:9093 --topic othertopic
    在窗口输入文字
5.启动消费者获取消息:
    kafka-console-consumer.sh –zookeeper localhost:2181 --topic othertopic --from-beginning

多节点--多broker集群
1.首先启动zookeeper服务
然后在两台机器Node1和Node2上,分别部署了两个broker,Zookeeper使用的是单独的ZK集群。
Node1节点:  cp server.properties server1.properties 
cp server.properties server2.properties 
vi server1.properties 修改以下参数: 
broker.id=1 7.port=9091 
host.name=172.16.212.17 
log.dirs=/tmp/kafka-logs/broker1/ 
zookeeper.connect=zk1:2181,zk2:2181,zk3:2181 
vi server2.properties 修改以下参数: 
broker.id=2 14.port=9092 
host.name=172.16.212.17 
log.dirs=/tmp/kafka-logs/broker2/ 
zookeeper.connect=zk1:2181,zk2:2181,zk3:2181
Node2节点: cp server.properties server3.properties 
cp server.properties server4.properties 
vi server1.properties 修改以下参数: 
broker.id=3 
port=9091 
host.name=172.16.212.102 
log.dirs=/tmp/kafka-logs/broker3/ 
zookeeper.connect=zk1:2181,zk2:2181,zk3:2181 
vi server2.properties 修改以下参数: 
broker.id=4 
port=9092 
host.name=172.16.212.102 
log.dirs=/tmp/kafka-logs/broker4/ 
zookeeper.connect=zk1:2181,zk2:2181,zk3:2181
2.在两个node上分别启动kafka服务
Node1节点: cd $KAFKA_HOME/bin 
./kafka-server-start.sh $KAFKA_HOME/config/server1.properties 
./kafka-server-start.sh $KAFKA_HOME/config/server2.properties
Node2节点: cd $KAFKA_HOME/bin 
./kafka-server-start.sh $KAFKA_HOME/config/server3.properties 
./kafka-server-start.sh $KAFKA_HOME/config/server4.properties
3.创建topic(创建一个就行)
./kafka-topics.sh --create --zookeeper zk1:2181,zk2:2181,zk3:2181 --replication-factor 2 --partitions 2 --topic text123
查看topic:./kafka-topics.sh --describe --zookeeper zk1:2181,zk2:2181,zk3:2181 --topic text123
4.模拟producer发送消息(先启动consumer再发送消息,不然启动之前的消息收不到)
找一个node节点:
./kafka-console-producer.sh --broker-list 172.16.212.17:9091,172.16.212.17:9092,172.16.212.102:9091,172.16.212.102:9092 -topic text123
输出一些消息:hello,world
5.模拟consumer接受消息:
./kafka-console-consumer.sh --zookeeper zk1:2181,zk2:2181,zk3:2181 --topic text123 --from-beginning This is Kafka producer
在窗口能看到消息
6.删除topic
cd $KAFKA_HOME/bin 
./kafka-topics.sh --delete --zookeeper zk1:2181,zk2:2181,zk3:2181 --topic text123

停止kafka:./kafka-server-stop.sh 或者找到kafka的进程,直接kill掉即可。


三.核心概念
push and pull
作为一个messaging system(信息系统),Kafka遵循了传统的方式,选择由producer向broker push消息并由consumer从 broker pull消息。push模式和pull模式各有优劣。 push模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。push模式的目标是尽可能以最快速度 传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根 据consumer的消费能力以适当的速率消费消息。

topic & partition
为了使得Kafka的吞吐率可以水平扩展,物理上把topic分成一个或多个partition,每个partition在物理上对应一个文件夹,该文件夹 下存储这个partition的所有消息和索引文件。
每一条消息被发送到broker时,会根据paritition规则选择被存储到哪一个partition。如果partition规则设置的合理,所有消息可以均 匀分布到不同的partition里,这样就实现了水平扩展。 
在发送一条消息时,可以指定这条消息的key,producer根据这个key和partition机制来判断将这条消息发送到哪个parition。 paritition机制可以通过指定producer的paritition. class这一参数来指定,该class必须实现【kafka.producer.Partitioner】接口。

对于传统的message queue(消息队列)而言,一般会删除已经被消费的消息,而Kafka集群会保留所有的消息,无论其被消费与否。当然,因为 磁盘限制,不可能永久保留所有数据(实际上也没必要),因此Kafka提供两种策略去删除旧数据。一是基于时间,二是基于partition文件 大小。例如可以通过配置【$KAFKA_HOME/config/server.properties】让Kafka删除一周前的数据,也可通过配置让Kafka在partition文 件超过1GB时删除旧数据。

Consumer group
每一个consumer实例都属于一个consumer group,每一条消息只会被同一个consumer group里的一个consumer 实例消费。(不同consumer group可以同时消费同一条消息)
很多传统的message queue都会在消息被消费完后将消息删除,一方面避免重复消费,另一方面可以保证queue的 长度比较少,提高效率。而如上文所将,Kafka并不删除已消费的消息,为了实现传统message queue消息只被消费一次 的语义,Kafka保证同一个consumer group里只有一个consumer会消费这一条消息。与传统message queue不同的是,Kafka还允许不同consumer group同时消费同一条消息,这一特性可以为消息的多元化处理提供了支持。 实际上,Kafka的设计理念之一就是同时提供离线处理和实时处理。根据这一特性,可以使用Storm这种实时流处理 系统对消息进行实时在线处理,同时使用Hadoop这种批处理系统进行离线处理,还可以同时将数据实时备份到另一个数据中心,只需要保证这三个操作所使用的consumer在不同的consumer group即可。

/*kafka的一段代码,作用忘了*/
import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

object KafkaSSC {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("KafkaSSC").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.sparkContext.setLogLevel("ERROR")

// Create direct kafka stream with brokers and topics
val topics = Array("sparktopic").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> "172.16.11.46:9092")
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics)
// Get the lines, split them into words, count the words and print
val lines = messages.map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCount = words.map(x => (x, 1L)).reduceByKey(_+_)
wordCount.print()

// Start the computation
ssc.start()
ssc.awaitTermination()
}

}

































相关文章: