安装

# brew install kafka

配置

"""
zookeeper配置文件/usr/local/etc/kafka/zookeeper.propertie
kafka配置文件/usr/local/etc/kafka/server.properties

需要修改的地方:
############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://a.b.c.d:9092
"""

启动

# zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & kafka-server-start /usr/local/etc/kafka/server.properties

测试

"""
#创建topic 
$kafka-topics –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic test
#查看创建的topic 
$kafka-topics –list –zookeeper localhost:2181
#发送一些消息 
$kafka-console-producer –broker-list localhost:9092 –topic test
#消费消息 
$kafka-console-consumer –bootstrap-server localhost:9092 –topic test –from-beginning
"""

Python与kafka联动


安装依赖库

# pip2 install pykafka

生产者

# -*- coding:utf-8 -*-


#引入依赖库、包、模块、对象
from pykafka import KafkaClient


#定义全局变量
client = KafkaClient(hosts="192.168.1.1:9092")#建立kafka连接
config = SslConfig(
    cafile='/your/ca.cert',
    certfile='/your/client.cert',
    keyfile='/your/client.key',
    password='unlock my client key please'
)
#client = KafkaClient(host="192.168.1.1:9202",ssl_config=config)加ssl的连接方式


#查看所有的topic
#print client.topics
#topic = client.topics['topic_key']#选择一个topic


#当有了topic之后呢,可以创建一个producer,来发消息,生产kafka数据,通过字符串形式,这里是异步的流程,高性能时候delivery_reports=True,批量生产
with topic.get_sync_producer() as producer:
    for i in range(4):
        producer.produce('test message ' + str(i ** 2))
with topic.get_producer(delivery_reports=True) as producer:
    count = 0
    while True:
        count += 1
        producer.produce('test msg', partition_key='{}'.format(count))
        if count % 10 ** 5 == 0:
            while True:
                try:
                    msg, exc = producer.get_delivery_report(block=False)
                    if exc is not None:
                        print 'Failed to deliver msg {}: {}'.format(
                            msg.partition_key, repr(exc))
                    else:
                        print 'Successfully delivered msg {}'.format(
                        msg.partition_key)
                except Queue.Empty:
                    break
#从topic获取生产者,并生产数据
producer=topic.get_producer()
producer.produce(message)

消费者


#!/usr/bin/python
# -*- coding:utf-8 -*-
from pykafka import KafkaClient

#连接kafka
client = KafkaClient(hosts='192.168.1.1:9092')#这里连接多个客户端
topic = client.topics['topic_key']

#单一消费者
consumer = topic.get_simple_consumer()
#负载均衡
balanced_consumer = topic.get_balanced_consumer(
    consumer_group='testgroup',
    auto_commit_enable=True,  # 设置为False的时候不需要添加consumer_group,直接连接topic即可取到消息
    zookeeper_connect='192.168.1.1:2181'#这里就是连接多个zk
)
for message in consumer:
    if message is not None:
        print message.offset, message.value#打印接收到的消息体的偏移个数和值
for message in balanced_consumer:
    if message is not None:
        print message.offset, message.value#打印接收到的消息体的偏移个数和值

相关文章:

  • 2022-12-23
  • 2021-06-03
  • 2022-12-23
  • 2021-08-12
  • 2022-01-19
  • 2021-09-12
猜你喜欢
  • 2022-01-12
  • 2022-12-23
  • 2022-12-23
  • 2021-07-19
  • 2021-04-02
  • 2021-09-12
  • 2021-04-07
相关资源
相似解决方案