Apache Kafka系列(一) 起步

摘要:

  1.Apache Kafka基本概念

  2.Kafka的安装

  3.基本工具创建Topic

 本文基于centos7, Apache Kafka 0.11.0

一、基本概念

  Apache Kafka是一个发布/订阅的消息系统,于2009年源自Linkedin,并与2011年开源。在架构方面,Kafka类似于其他的消息系统(ActiveMQ,RabbitMQ)。但是Kafka有诸多的特性使得越来越流行:

  • Kafka本身支持分布式,很容易横向扩展
  • 高吞吐量,高性能
  • 高容错性,即使宕机

  Kafka在实时数据处理方面的应用场景越来越多

二、安装

  1.安装jdk

[[email protected] /]# yum install java-1.8.0-openjdk.x86_64

     2.下载kafka_2.12-0.11.0.0.tgz到/tmp下

[[email protected] /]# wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/0.11.0.0/kafka_2.12-0.11.0.0.tgz  -P /tmp

       3.解压压缩文件到/opt目录下

[[email protected] /]# tar -xvzf /tmp/kafka_2.12-0.11.0.0.tgz -C /opt/

       4.解压后的目录结构如下

[[email protected] kafka_2.12-0.11.0.0]# pwd
/opt/kafka_2.12-0.11.0.0
[[email protected] kafka_2.12-0.11.0.0]# ls
LICENSE  NOTICE  bin  config  libs  site-docs
[[email protected] kafka_2.12-0.11.0.0]# 
  • bin:包含Kafka运行的所有脚本,如:start/stop Zookeeper,start/stop Kafka
  • libs:Kafka运行的依赖库
  • config:zookeeper,Logger,Kafka等相关配置文件
  • sit-docs:Kafka相关文档

三、启动Kafka服务器

  Kafka能够以3种方式部署集群

  • 单节点-单Broker集群:只在一个节点上部署一个Broker
  • 单节点-多Broker集群:在一个节点上部署多个Broker,只不过各个Broker以不同的端口启动
  • 多节点-多Broker集群:以上两种的组合,每个节点上部署一到多个Broker,且各个节点连接起来

  本次仅以第一种方式部署。启动Kafka需要两步:

  • 启动ZooKeeper

    Kafka使用ZooKeeper来存储集群元数据和Consumer信息。因此,有两种选项:

    第一,独立部署ZooKeeper应用并配置Kafka集群到该ZooKeeper;

    第二,使用Kafka自带的ZooKeeper

    本文选择使用Kafka自带的ZooKeeper

  •  启动Kafka服务

    3.1 启动ZooKeeper

kafka 学习笔记

[[email protected] kafka_2.12-0.11.0.0]# bin/zookeeper-server-start.sh config/zookeeper.properties 
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
[2017-08-10 14:02:29,426] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...
[2017-08-10 14:02:29,491] INFO minSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)
[2017-08-10 14:02:29,491] INFO maxSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)
[2017-08-10 14:02:29,501] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)

kafka 学习笔记

    ZooKeeper成功启动,并绑定到端口2181。该端口是ZooKeeper的默认端口,可以通过编辑文件config/zookeeper.properties 中的clientPort来修改监听端口。

  3.2 启动Kafka Broker

kafka 学习笔记

[[email protected] kafka_2.12-0.11.0.0]# bin/kafka-server-start.sh config/server.properties 
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
[2017-08-10 14:11:58,741] INFO KafkaConfig values: 
 .....
[2017-08-10 14:12:00,385] INFO Kafka version : 0.11.0.0 (org.apache.kafka.common.utils.AppInfoParser)
[2017-08-10 14:12:00,385] INFO Kafka commitId : cb8625948210849f (org.apache.kafka.common.utils.AppInfoParser)
[2017-08-10 14:12:00,386] INFO [Kafka Server 0], started (kafka.server.KafkaServer)

kafka 学习笔记

 

四、使用Kafka

  4.1 创建一个Topic 名称为HelloWorld

[[email protected] kafka_2.12-0.11.0.0]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 \
> --partitions 1 --topic HelloWorld

     校验Topic是否创建成功

[[email protected] kafka_2.12-0.11.0.0]# bin/kafka-topics.sh --list --zookeeper localhost:2181
HelloWorld

     4.2 启动一个Producer并发送消息

   可以使用Kafka命令行客户端(获取标准命令行输入并一Message形式发出)发送消息到Kafka集群。默认情况下,每行会单独算作一次消息发出。下例通过该命令行终端发送消息到HelloWorld这个Topic,命令如下:

[[email protected] kafka_2.12-0.11.0.0]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic HelloWorld
>hello world!
>this is the first greating
>

     4.3 启动一个Consumer并接受消息

      跟4.2中类似,同样可以使用Kafka命令行终端来启动一个Consumer(格式化消息到标准输出),命令如下

[[email protected] kafka_2.12-0.11.0.0]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topicelloWorld --from-beginning
hello world!
this is the first greating

  

五、结论

  本文展示了如何一步一步安装Kafka到Linux,涵盖如何下载,启动ZooKeeper/Kafka,发送和接受来自Kafka服务器的消息

 

Apache Kafka系列(二) 命令行工具(CLI)

Apache Kafka命令行工具(Command Line Interface,CLI),下文简称CLI。

1. 启动Kafka

  启动Kafka需要两步:

  1.1. 启动ZooKeeper 

[[email protected] kafka_2.12-0.11.0.0]# bin/zookeeper-server-start.sh config/zookeeper.properties

  1.2. 启动Kafka Server

[[email protected] kafka_2.12-0.11.0.0]# bin/kafka-server-start.sh config/server.properties 

2. 列出Topic

[[email protected] kafka_2.12-0.11.0.0]# bin/kafka-topics.sh --zookeeper localhost:2181 --list
HelloWorld

3. 创建Topic

[[email protected] kafka_2.12-0.11.0.0]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic Demo1
Created topic "Demo1".

  上述命令会创建一个名为Demo1的Topic,并指定了replication-factor和partitions分别为1。其中replication-factor控制一个Message会被写到多少台服务器上,因此这个值必须小于或者

  等于Broker的数量。

4. 描述Topic

[[email protected] kafka_2.12-0.11.0.0]# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic Demo1
Topic:Demo1     PartitionCount:1        ReplicationFactor:1     Configs:
        Topic: Demo1    Partition: 0    Leader: 0       Replicas: 0     Isr: 0

5. 发布消息到指定的Topic

kafka 学习笔记

[[email protected] kafka_2.12-0.11.0.0]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Demo1
>this
>is 
>the 
>firest
>input

kafka 学习笔记

  可以在控制台逐行输入任意消息。命令的终止符是:control + C组合键。

6. 消费指定Topic上的消息

kafka 学习笔记

[[email protected] kafka_2.12-0.11.0.0]# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic Demo1
this
is 
the 
firest
input

kafka 学习笔记

7. 修改Topic

  7.1 增加指定Topic的partition,在第3步中创建的Demo1的partition是1。如下命令将增加10个partition

[[email protected] kafka_2.12-0.11.0.0]# bin/kafka-topics.sh --alter --zookeeper localhost:2181 --partitions 11 --topic Demo1
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!

  7.2. 删除指定Topic

[[email protected] kafka_2.12-0.11.0.0]# bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic Demo1
Topic Demo1 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

    Note中指出该Topic并没有真正的删除,如果真删除,需要把server.properties中的delete.topic.enable置为true

  7.3 给指定的Topic增加配置项,如给一个增加max message size值为128000

[[email protected] kafka_2.12-0.11.0.0]# bin/kafka-topics.sh --alter --zookeeper localhost:2181 --topic Demo1 --config max.message.bytes=128000
WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.
         Going forward, please use kafka-configs.sh for this functionality
Updated config for topic "Demo1".

    warning中指出该命令已经过期,将来可能被删除,替代的命令是使用kafka-config.sh。新命令如下:

[[email protected] kafka_2.12-0.11.0.0]# bin/kafka-configs.sh --alter --zookeeper localhost:2181 --entity-type topics --entity-name Demo1 --add-config max.message.bytes=12800
Completed Updating config for entity: topic 'Demo1'.

    需要使用entity-type置为topics,并在entity-name中指定对应的名称

8. 结论

  本文展示了CLI所提供的一些常用的命令,这些基本的命令在运维Kafka过程中很实用。

 

Apache Kafka系列(三) Java API使用

摘要:

  Apache Kafka Java Client API

一、基本概念

  Kafka集成了Producer/Consumer连接Broker的客户端工具,但是在消息处理方面,这两者主要用于服务端(Broker)的简单操作,如:

    1.创建Topic

    2.罗列出已存在的Topic

    3.对已有Topic的Produce/Consume测试

  跟其他的消息系统一样,Kafka提供了多种不用语言实现的客户端API,如:Java,Python,Ruby,Go等。这些API极大的方便用户使用Kafka集群,本文将展示这些API的使用

二、前提

  • 在本地虚拟机中安装了Kafka 0.11.0版本,可以参照前一篇文章:  Apache Kafka系列(一) 起步
  • 本地安装有JDK1.8
  • IDEA编译器
  • Maven3

三、项目结构

  Maven pom.xml如下:

kafka 学习笔记

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.randy</groupId>
  <artifactId>kafka_api_demo</artifactId>
  <version>1.0-SNAPSHOT</version>
  <name>Maven</name>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>0.11.0.0</version>
    </dependency>
  </dependencies>
</project>

kafka 学习笔记

 

四、源码

  4.1 Producer的源码    

kafka 学习笔记

package com.randy;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;


/**
 * Author  : RandySun
 * Date    : 2017-08-13  16:23
 * Comment :
 */
public class ProducerDemo {

    public static void main(String[] args){
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "192.168.1.110:9092");
        properties.put("acks", "all");
        properties.put("retries", 0);
        properties.put("batch.size", 16384);
        properties.put("linger.ms", 1);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = null;
        try {
            producer = new KafkaProducer<String, String>(properties);
            for (int i = 0; i < 100; i++) {
                String msg = "Message " + i;
                producer.send(new ProducerRecord<String, String>("HelloWorld", msg));
                System.out.println("Sent:" + msg);
            }
        } catch (Exception e) {
            e.printStackTrace();

        } finally {
            producer.close();
        }

    }
}

kafka 学习笔记

  可以使用KafkaProducer类的实例来创建一个Producer,KafkaProducer类的参数是一系列属性值,下面分析一下所使用到的重要的属性:

  • bootstrap.servers
properties.put("bootstrap.servers", "192.168.1.110:9092");

   bootstrap.servers是Kafka集群的IP地址,如果Broker数量超过1个,则使用逗号分隔,如"192.168.1.110:9092,192.168.1.110:9092"。其中,192.168.1.110是我的其中一台虚拟机的

           IP地址,9092是所监听的端口

  • key.serializer   &  value.serializer
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

     序列化类型。 Kafka消息是以键值对的形式发送到Kafka集群的,其中Key是可选的,Value可以是任意类型。但是在Message被发送到Kafka集群之前,Producer需要把不同类型的消

   息序列化为二进制类型。本例是发送文本消息到Kafka集群,所以使用的是StringSerializer。

  • 发送Message到Kafka集群
   for (int i = 0; i < 100; i++) {
      String msg = "Message " + i;
      producer.send(new ProducerRecord<String, String>("HelloWorld", msg));
      System.out.println("Sent:" + msg);
   }

   上述代码会发送100个消息到HelloWorld这个Topic

  4.2 Consumer的源码

kafka 学习笔记

package com.randy;

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

/**
 * Author  : RandySun
 * Date    : 2017-08-13  17:06
 * Comment :
 */
public class ConsumerDemo {

    public static void main(String[] args){
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "192.168.1.110:9092");
        properties.put("group.id", "group-1");
        properties.put("enable.auto.commit", "true");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("auto.offset.reset", "earliest");
        properties.put("session.timeout.ms", "30000");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
        kafkaConsumer.subscribe(Arrays.asList("HelloWorld"));
        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, value = %s", record.offset(), record.value());
                System.out.println();
            }
        }

    }
}

kafka 学习笔记

  可以使用KafkaConsumer类的实例来创建一个Consumer,KafkaConsumer类的参数是一系列属性值,下面分析一下所使用到的重要的属性:

  • bootstrap.servers

    和Producer一样,是指向Kafka集群的IP地址,以逗号分隔。

  • group.id

     Consumer分组ID

  • key.deserializer and value.deserializer

     发序列化。Consumer把来自Kafka集群的二进制消息反序列化为指定的类型。因本例中的Producer使用的是String类型,所以调用StringDeserializer来反序列化

  Consumer订阅了Topic为HelloWorld的消息,Consumer调用poll方法来轮循Kafka集群的消息,其中的参数100是超时时间(Consumer等待直到Kafka集群中没有消息为止): 

kafka 学习笔记

        kafkaConsumer.subscribe(Arrays.asList("HelloWorld"));
        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, value = %s", record.offset(), record.value());
                System.out.println();
            }
        }

kafka 学习笔记

五、总结

  本文展示了如何创建一个Producer并生成String类型的消息,Consumer消费这些消息。这些都是基于Apache Kafka 0.11.0 Java API。

 

Apache Kafka系列(四) 多线程Consumer方案

本文的图片是通过PPT截图出的,读者如果修改意见请联系我

一、Consumer为何需要实现多线程

  假设我们正在开发一个消息通知模块,该模块允许用户订阅其他用户发送的通知/消息。该消息通知模块采用Apache Kafka,那么整个架构应该是消息的发布者通过Producer调用API写入消息到Kafka Cluster中,然后消息的订阅者通过Consumer读取消息,刚开始的时候系统架构图如下:

kafka 学习笔记

          但是,随着用户数量的增多,通知的数据也会对应的增长。总会达到一个阈值,在这个点上,Producer产生的数量大于Consumer能够消费的数量。那么Broker中未消费的消息就会逐渐增多。即使Kafka使用了优秀的消息持久化机制来保存未被消费的消息,但是Kafka的消息保留机制限制(时间,分区大小,消息Key)也会使得始终未被消费的Message被永久性的删除。另一方面从业务上讲,一个消息通知系统的高延迟几乎算作是废物了。所以多线程的Consumer模型是非常有必要的。

二、多线程的Kafka Consumer 模型类别

  基于Consumer的多线程模型有两种类型:

  • 模型一:多个Consumer且每一个Consumer有自己的线程,对应的架构图如下:

                         kafka 学习笔记

 

  • 模型二:一个Consumer且有多个Worker线程

                         kafka 学习笔记

     两种实现方式的优点/缺点比较如下:

名称 优点 缺点
模型一

1.Consumer Group容易实现

2.各个Partition的顺序实现更容易

1.Consumer的数量不能超过Partition的数量,否则多出的Consumer永远不会被使用到

2.因没个Consumer都需要一个TCP链接,会造成大量的系统性能损耗

模型二 1.由于通过线程池实现了Consumer,横向扩展更方便

1.在每个Partition上实现顺序处理更困难。

例如:同一个Partition上有两个待处理的Message需要被线程池中的2个线程消费掉,那这两个线程必须实现同步

三、代码实现

3.1 前提

    • Kafka Broker 0.11.0
    • JDK1.8
    • IDEA
    • Maven3
    • Kafka环境搭建及Topic创建修改等请参照本系列的前几篇文章。

 3.2 源码结构

                 kafka 学习笔记

       其中,consumergroup包下面对应的是模型一的代码,consumerthread包下是模型二的代码。ProducerThread是生产者代码。

 3.3 pom.xml

kafka 学习笔记

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.randy</groupId>
  <artifactId>kafka_multithread_consumer_model</artifactId>
  <packaging>war</packaging>
  <version>1.0-SNAPSHOT</version>
  <name>kafka_multithread_consumer_model Maven Webapp</name>
  <url>http://maven.apache.org</url>


  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>0.11.0.0</version>
    </dependency>
  </dependencies>

  <build>
    <finalName>kafka_multithread_consumer_model</finalName>
  </build>
</project>

kafka 学习笔记

 3.4 方案一:Consumer Group

  ProducerThread.java是一个生产者线程,发送消息到Broker

  ConsumerThread.java是一个消费者线程,由于消费消息

  ConsumerGroup.java用于产生一组消费者线程

  ConsumerGroupMain.java是入口类     

3.4.1 ProducerThread.java 

kafka 学习笔记

kafka 学习笔记

package com.randy;

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

/**
 * Author  : RandySun ([email protected])
 * Date    : 2017-08-20  11:41
 * Comment :
 */
public class ProducerThread implements Runnable {
    private final Producer<String,String> kafkaProducer;
    private final String topic;

    public ProducerThread(String brokers,String topic){
        Properties properties = buildKafkaProperty(brokers);
        this.topic = topic;
        this.kafkaProducer = new KafkaProducer<String,String>(properties);

    }

    private static Properties buildKafkaProperty(String brokers){
        Properties properties = new Properties();
        properties.put("bootstrap.servers", brokers);
        properties.put("acks", "all");
        properties.put("retries", 0);
        properties.put("batch.size", 16384);
        properties.put("linger.ms", 1);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        return properties;
    }

    @Override
    public void run() {
        System.out.println("start sending message to kafka");
        int i = 0;
        while (true){
            String sendMsg = "Producer message number:"+String.valueOf(++i);
            kafkaProducer.send(new ProducerRecord<String, String>(topic,sendMsg),new Callback(){

                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if(e != null){
                        e.printStackTrace();
                    }
                    System.out.println("Producer Message: Partition:"+recordMetadata.partition()+",Offset:"+recordMetadata.offset());
                }
            });
            // thread sleep 3 seconds every time
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("end sending message to kafka");
        }
    }
}

kafka 学习笔记

 3.4.2 ConsumerThread.java

kafka 学习笔记

kafka 学习笔记

package com.randy.consumergroup;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

/**
 * Author  : RandySun ([email protected])
 * Date    : 2017-08-20  12:03
 * Comment :
 */
public class ConsumerThread implements Runnable {
    private static KafkaConsumer<String,String> kafkaConsumer;
    private final String topic;

    public ConsumerThread(String brokers,String groupId,String topic){
        Properties properties = buildKafkaProperty(brokers,groupId);
        this.topic = topic;
        this.kafkaConsumer = new KafkaConsumer<String, String>(properties);
        this.kafkaConsumer.subscribe(Arrays.asList(this.topic));
    }

    private static Properties buildKafkaProperty(String brokers,String groupId){
        Properties properties = new Properties();
        properties.put("bootstrap.servers", brokers);
        properties.put("group.id", groupId);
        properties.put("enable.auto.commit", "true");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("session.timeout.ms", "30000");
        properties.put("auto.offset.reset", "earliest");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        return properties;
    }

    @Override
    public void run() {
        while (true){
            ConsumerRecords<String,String> consumerRecords = kafkaConsumer.poll(100);
            for(ConsumerRecord<String,String> item : consumerRecords){
                System.out.println("Consumer Message:"+item.value()+",Partition:"+item.partition()+"Offset:"+item.offset());
            }
        }
    }
}

kafka 学习笔记

3.4.3 ConsumerGroup.java

kafka 学习笔记

kafka 学习笔记

package com.randy.consumergroup;

import java.util.ArrayList;
import java.util.List;

/**
 * Author  : RandySun ([email protected])
 * Date    : 2017-08-20  14:09
 * Comment :
 */
public class ConsumerGroup {
    private final String brokers;
    private final String groupId;
    private final String topic;
    private final int consumerNumber;
    private List<ConsumerThread> consumerThreadList = new ArrayList<ConsumerThread>();

    public ConsumerGroup(String brokers,String groupId,String topic,int consumerNumber){
        this.groupId = groupId;
        this.topic = topic;
        this.brokers = brokers;
        this.consumerNumber = consumerNumber;
        for(int i = 0; i< consumerNumber;i++){
            ConsumerThread consumerThread = new ConsumerThread(brokers,groupId,topic);
            consumerThreadList.add(consumerThread);
        }
    }

    public void start(){
        for (ConsumerThread item : consumerThreadList){
            Thread thread = new Thread(item);
            thread.start();
        }
    }
}

kafka 学习笔记

3.4.4 ConsumerGroupMain.java  

kafka 学习笔记

kafka 学习笔记

package com.randy.consumergroup;

import com.randy.ProducerThread;

/**
 * Author  : RandySun ([email protected])
 * Date    : 2017-08-20  14:18
 * Comment :
 */
public class ConsumerGroupMain {

    public static void main(String[] args){
        String brokers = "Server2:9092";
        String groupId = "group01";
        String topic = "HelloWorld";
        int consumerNumber = 3;

        Thread producerThread = new Thread(new ProducerThread(brokers,topic));
        producerThread.start();

        ConsumerGroup consumerGroup = new ConsumerGroup(brokers,groupId,topic,consumerNumber);
        consumerGroup.start();
    }
}

kafka 学习笔记

3.5 方案二:多线程的Consumer

  ConsumerThreadHandler.java用于处理发送到消费者的消息

  ConsumerThread.java是消费者使用线程池的方式初始化消费者线程

  ConsumerThreadMain.java是入口类

3.5.1 ConsumerThreadHandler.java

kafka 学习笔记

kafka 学习笔记

package com.randy.consumerthread;

import org.apache.kafka.clients.consumer.ConsumerRecord;

/**
 * Author  : RandySun ([email protected])
 * Date    : 2017-08-20  16:29
 * Comment :
 */
public class ConsumerThreadHandler implements Runnable {
    private ConsumerRecord consumerRecord;

    public ConsumerThreadHandler(ConsumerRecord consumerRecord){
        this.consumerRecord = consumerRecord;
    }

    @Override
    public void run() {
        System.out.println("Consumer Message:"+consumerRecord.value()+",Partition:"+consumerRecord.partition()+"Offset:"+consumerRecord.offset());
    }
}

kafka 学习笔记

3.5.2 ConsumerThread.java

kafka 学习笔记

kafka 学习笔记

package com.randy.consumerthread;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * Author  : RandySun ([email protected])
 * Date    : 2017-08-20  16:42
 * Comment :
 */
public class ConsumerThread {

    private final KafkaConsumer<String, String> consumer;
    private final String topic;
    // Threadpool of consumers
    private ExecutorService executor;


    public ConsumerThread(String brokers, String groupId, String topic){
        Properties properties = buildKafkaProperty(brokers,groupId);
        this.consumer = new KafkaConsumer<>(properties);
        this.topic = topic;
        this.consumer.subscribe(Arrays.asList(this.topic));
    }

    public void start(int threadNumber){
        executor = new ThreadPoolExecutor(threadNumber,threadNumber,0L, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<Runnable>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
        while (true){
            ConsumerRecords<String,String> consumerRecords = consumer.poll(100);
            for (ConsumerRecord<String,String> item : consumerRecords){
                executor.submit(new ConsumerThreadHandler(item));
            }
        }
    }

    private static Properties buildKafkaProperty(String brokers, String groupId){
        Properties properties = new Properties();
        properties.put("bootstrap.servers", brokers);
        properties.put("group.id", groupId);
        properties.put("enable.auto.commit", "true");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("session.timeout.ms", "30000");
        properties.put("auto.offset.reset", "earliest");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        return properties;
    }


}

kafka 学习笔记

3.5.3 ConsumerThreadMain.java

kafka 学习笔记

kafka 学习笔记

package com.randy.consumerthread;

import com.randy.ProducerThread;

/**
 * Author  : RandySun ([email protected])
 * Date    : 2017-08-20  16:49
 * Comment :
 */
public class ConsumerThreadMain {

    public static void main(String[] args){
        String brokers = "Server2:9092";
        String groupId = "group01";
        String topic = "HelloWorld";
        int consumerNumber = 3;


        Thread producerThread = new Thread(new ProducerThread(brokers,topic));
        producerThread.start();

        ConsumerThread consumerThread = new ConsumerThread(brokers,groupId,topic);
        consumerThread.start(3);


    }
}

kafka 学习笔记

四. 总结

  本篇文章列举了两种不同的消费者模式。两者各有利弊。所有代码都上传到了https://github.com/qizhelongdeyang/kafka_multithread_consumer_model.git ,如有疑问或者错误请指正

 

Apache Kafka系列(五) Kafka Connect及FileConnector示例

一. Kafka Connect简介

  Kafka是一个使用越来越广的消息系统,尤其是在大数据开发中(实时数据处理和分析)。为何集成其他系统和解耦应用,经常使用Producer来发送消息到Broker,并使用Consumer来消费Broker中的消息。Kafka Connect是到0.9版本才提供的并极大的简化了其他系统与Kafka的集成。Kafka Connect运用用户快速定义并实现各种Connector(File,Jdbc,Hdfs等),这些功能让大批量数据导入/导出Kafka很方便。

             kafka 学习笔记

如图中所示,左侧的Sources负责从其他异构系统中读取数据并导入到Kafka中;右侧的Sinks是把Kafka中的数据写入到其他的系统中。

二. 各种Kafka Connector

  Kafka Connector很多,包括开源和商业版本的。如下列表中是常用的开源Connector

Connectors References
Jdbc SourceSink
Elastic Search Sink1Sink2Sink3
Cassandra Source1Source 2Sink1Sink2 
MongoDB Source
HBase Sink
Syslog Source
MQTT (Source) Source
Twitter (Source) SourceSink
S3 Sink1Sink2
 

  商业版的可以通过Confluent.io获得

三. 示例

3.1 FileConnector Demo

 本例演示如何使用Kafka Connect把Source(test.txt)转为流数据再写入到Destination(test.sink.txt)中。如下图所示:

          kafka 学习笔记

      本例使用到了两个Connector:

  • FileStreamSource:从test.txt中读取并发布到Broker中
  • FileStreamSink:从Broker中读取数据并写入到test.sink.txt文件中

  其中的Source使用到的配置文件是${KAFKA_HOME}/config/connect-file-source.properties

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topic=connect-test

  其中的Sink使用到的配置文件是${KAFKA_HOME}/config/connect-file-sink.properties

name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt
topics=connect-test

  Broker使用到的配置文件是${KAFKA_HOME}/config/connect-standalone.properties

kafka 学习笔记

bootstrap.servers=localhost:9092key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=trueinternal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

kafka 学习笔记

 

3.2 运行Demo

  需要熟悉Kafka的一些命令行,参考本系列之前的文章Apache Kafka系列(二) 命令行工具(CLI)

 3.2.1 启动Kafka Broker

[[email protected] bin]# cd /opt/kafka_2.11-0.11.0.0/
[[email protected] kafka_2.11-0.11.0.0]# ls
bin  config  libs  LICENSE  logs  NOTICE  site-docs
[[email protected] kafka_2.11-0.11.0.0]# ./bin/zookeeper-server-start.sh ./config/zookeeper.properties &
[[email protected] kafka_2.11-0.11.0.0]# ./bin/kafka-server-start.sh ./config/server.properties &

3.2.2 启动Source Connector和Sink Connector

[[email protected] kafka_2.11-0.11.0.0]# ./bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties 

3.3.3 打开console-consumer

./kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic connect-test

3.3.4 写入到test.txt文件中,并观察3.3.3中的变化

[[email protected] kafka_2.12-0.11.0.0]# echo 'firest line' >> test.txt
[[email protected] kafka_2.12-0.11.0.0]# echo 'second line' >> test.txt
3.3.3中打开的窗口输出如下
{"schema":{"type":"string","optional":false},"payload":"firest line"}
{"schema":{"type":"string","optional":false},"payload":"second line"}

3.3.5 查看test.sink.txt

[[email protected] kafka_2.12-0.11.0.0]# cat test.sink.txt 
firest line
second line

 

四. 结论

本例仅仅演示了Kafka自带的File Connector,后续文章会完成JndiConnector,HdfsConnector,并且会使用CDC(Changed Data Capture)集成Kafka来完成一个ETL的例子

 PS:

相比编译过Kafka-Manager都知道各种坑,经过了3个小时的努力,我终于把Kafka-Manager编译通过并打包了,并且新增了Kafka0.11.0版本支持。

附下载地址: 链接: https://pan.baidu.com/s/1miiMsAk 密码: 866q

 

转载:http://www.cnblogs.com/qizhelongdeyang/p/7341954.html

相关文章:

猜你喜欢
  • 2021-06-20
  • 2021-05-25
  • 2022-01-19
  • 2022-01-06
相关资源
相似解决方案