xuxinkun

环境说明

kafka自0.9之后增加了connector的特性。本文主要是搭建一个分布式的kafka connector和broker。

本文用了三台机器进行部署,使用centos 6.6。

hostname ip role
node1 10.8.65.63 zk + kafak broker + schema-registry + kafka connector
node2 10.8.65.60 kafak broker + kafka connector
node3 10.8.65.62 kafak broker + kafka connector

安装

先安装jdk/mysql/confluent等组件

yum install -y wget curl mysql mysql-server lokkit

cd /root

## 安装jdk
wget --no-check-certificate --no-cookies --header "Cookie: oraclelicense=accept-securebackup-cookie"  http://download.oracle.com/otn-pub/java/jdk/8u91-b14/jdk-8u91-linux-x64.rpm
rpm -ivh jdk-8u91-linux-x64.rpm
## 安装jdbc-mysql-driver
wget http://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.39.tar.gz
tar xzvf mysql-connector-java-5.1.39.tar.gz
sed -i \'$a export CLASSPATH=/root/mysql-connector-java-5.1.39/mysql-connector-java-5.1.39-bin.jar:$CLASSPATH\' /etc/profile
source /etc/profile
## 安装confluent
cd /usr/local
wget http://packages.confluent.io/archive/2.0/confluent-2.0.1-2.11.7.tar.gz 
tar xzvf confluent-2.0.1-2.11.7.tar.gz

部署

准备

声明变量

## node1上
export role_num=1
## node2上
export role_num=2
## node3上
export role_num=3

设置hostname和hosts

## 把hostname设置为对应的名字
hostname node$role_num

## 在/etc/hosts中配上node1~3的IP。
sed -i \'$a 10.8.65.63 node1\' /etc/hosts
sed -i \'$a 10.8.65.62 node3\' /etc/hosts
sed -i \'$a 10.8.65.60 node2\' /etc/hosts

数据源准备

数据源使用mysql

/etc/init.d/mysqld restart
mysql
>
CREATE USER \'ct\'@\'%\' IDENTIFIED BY \'123456\';
CREATE USER \'ct\'@\'localhost\' IDENTIFIED BY \'123456\';
CREATE USER \'ct\'@\'node1\' IDENTIFIED BY \'123456\';
GRANT ALL ON *.* TO \'ct\'@\'%\';
GRANT ALL ON *.* TO \'ct\'@\'localhost\';
GRANT ALL ON *.* TO \'ct\'@\'node1\';
CREATE DATABASE mytest;
use mytest;
CREATE TABLE accounts(id INTEGER PRIMARY KEY AUTO_INCREMENT NOT NULL, name VARCHAR(255));
INSERT INTO accounts(name) VALUES(\'alice\');
INSERT INTO accounts(name) VALUES(\'bob\');

产生模拟数据

create procedure genUsers()
begin
  declare i int default 0;
  while i < 40 do
  INSERT INTO accounts(name) VALUES (\'num0\'),(\'num1\'),(\'num2\'),(\'num3\'),(\'num4\'),(\'num5\'),(\'num6\'),(\'num7\'),(\'num8\'),(\'num9\'),(\'num10\'),(\'num11\'),(\'num12\'), (\'num13\'),(\'num14\'),(\'num15\'),(\'num16\'),(\'num17\'),(\'num18\'),(\'num19\'),(\'num20\'),(\'num21\'),(\'num22\'),(\'num23\'),(\'num24\'), (\'num25\'),(\'num26\'),(\'num27\'),(\'num28\'),(\'num29\'),(\'num30\'),(\'num31\'),(\'num32\'),(\'num33\'),(\'num34\'),(\'num35\'),(\'num36\'), (\'num37\'),(\'num38\'),(\'num39\'),(\'num40\'),(\'num41\'),(\'num42\'),(\'num43\'),(\'num44\'),(\'num45\'),(\'num46\'),(\'num47\'),(\'num48\'), (\'num49\'),(\'num50\'),(\'num51\'),(\'num52\'),(\'num53\'),(\'num54\'),(\'num55\'),(\'num56\'),(\'num57\'),(\'num58\'),(\'num59\'),(\'num60\'), (\'num61\'),(\'num62\'),(\'num63\'),(\'num64\'),(\'num65\'),(\'num66\'),(\'num67\'),(\'num68\'),(\'num69\'),(\'num70\'),(\'num71\'),(\'num72\'), (\'num73\'),(\'num74\'),(\'num75\'),(\'num76\'),(\'num77\'),(\'num78\'),(\'num79\'),(\'num80\'),(\'num81\'),(\'num82\'),(\'num83\'),(\'num84\'), (\'num85\'),(\'num86\'),(\'num87\'),(\'num88\'),(\'num89\'),(\'num90\'),(\'num91\'),(\'num92\'),(\'num93\'),(\'num94\'),(\'num95\'),(\'num96\'), (\'num97\'),(\'num98\'),(\'num99\');
  set i = i + 1;
  end while;
end

call genUsers();
select count(*) from accounts;

zookeeper

只有node1上有zk,只在node1上执行

cd /usr/local/confluent-2.0.1/
./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties &
lokkit -p 2181:tcp

broker

在node1~3上执行

sed -i \'s/zookeeper.connect=localhost:2181/zookeeper.connect=node1:2181/\' ./etc/kafka/server.properties
sed -i \'s/broker.id=0/broker.id=\'$role_num\'/\' ./etc/kafka/server.properties
./bin/kafka-server-start ./etc/kafka/server.properties &
lokkit -p 9092:tcp

scheme-registry

只在node1上执行

./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties &
lokkit -p 8081:tcp

connector

在node1~3上执行

sed -i \'s|localhost:8081|node1:8081|\' ./etc/schema-registry/connect-avro-distributed.properties
bin/connect-distributed etc/schema-registry/connect-avro-distributed.properties &
lokkit -p 8083:tcp

测试

分布式的connector需要使用api来进行查询

创建connector

curl -X POST -H "Content-Type: application/json" --data \'{"name": "test-mysql-jdbc-autoincrement", "config": {"connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector", "tasks.max":"3", "connection.url":"jdbc:mysql://node1:3306/mytest?user=ct&password=123456","topic":"connect-test","mode":"incrementing","incrementing.column.name":"id","topic.prefix":"test-mysql-jdbc-" }}\' http://localhost:8083/connectors

获取connector信息

## 获取所有的connectors
curl -X GET -H "Content-Type: application/json" http://localhost:8083/connectors 
## 获取connector的tasks
curl -X GET -H "Content-Type: application/json" http://localhost:8083/connectors/test-mysql-jdbc-autoincrement/tasks
## 获取connector的config
curl -X GET -H "Content-Type: application/json" http://localhost:8083/connectors/test-mysql-jdbc-autoincrement/config

消费

./bin/kafka-avro-console-consumer --new-consumer --bootstrap-server localhost:9092 --topic test-mysql-jdbc-accounts --from-beginning

bin/kafka-console-consumer --zookeeper node1:2181 --topic connect-configs --from-beginning
bin/kafka-console-consumer --zookeeper node1:2181 --topic connect-offsets --from-beginning


bin/kafka-topics --zookeeper node1:2181 --describe --topic test-mysql-jdbc-accounts

删除connector

curl -X DELETE -H "Content-Type: application/json" http://localhost:8083/connectors/test-mysql-jdbc-autoincrement

分类:

技术点:

相关文章: