castlevania
kafka端
 
consumer vpc版代码
 
import socket
from kafka import KafkaConsumer
from kafka.errors import KafkaError

# context.check_hostname = True

consumer = KafkaConsumer(bootstrap_servers=[\'192.168.xx.xx:9092\'],
                        group_id=\'xx\',
                        api_version = (0,10)
                        )

print(\'consumer start to consuming...\')
consumer.subscribe((\'xx\',))
for message in consumer:
    print(message.topic)
    print(message.offset)
    print(message.key)
    print(message.value)
    print(message.partition)

 

producer vpc版代码

#!/usr/bin/env python
# encoding: utf-8

import socket
from kafka import KafkaProducer
from kafka.errors import KafkaError

producer = KafkaProducer(bootstrap_servers=[\'192.168.xx.xx:9092\'],
                        api_version = (0,10),
                        retries=5)

partitions = producer.partitions_for(\'xx\')
print(\'Topic下分区: %s\' % partitions)

try:
    future = producer.send(topic=\'xx\', value=b\'hello aliyun-kafka!\')
    future.get()
    print(\'send message succeed.\')
except KafkaError as e:
    print(\'send message failed.\')
    print(e)

consumer公网版代码

import ssl
import socket
from kafka import KafkaConsumer
from kafka.errors import KafkaError


context = ssl.create_default_context()
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.verify_mode = ssl.CERT_REQUIRED
# context.check_hostname = True
context.load_verify_locations("/tmp/ca-cert")

consumer = KafkaConsumer(bootstrap_servers=[\'kafka-ons-internet.aliyun.com:8080\'],
                        group_id=\'xxx\',
                        sasl_mechanism="PLAIN",
                        ssl_context=context,
                        security_protocol=\'SASL_SSL\',
                        api_version = (0,10),
                        sasl_plain_username=\'xxx\',
                        sasl_plain_password=\'1234567890\')

print(\'consumer start to consuming...\')
consumer.subscribe((\'xxx\', ))
for message in consumer:
    print(message.topic)
    print(message.offset)
    print(message.value)
    break

 

 
producer 公网版代码
#!/usr/bin/env python
# encoding: utf-8

import ssl
import socket
from kafka import KafkaProducer
from kafka.errors import KafkaError

context = ssl.create_default_context()
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.verify_mode = ssl.CERT_REQUIRED
# context.check_hostname = True
context.load_verify_locations("/tmp/ca-cert")
#这个文件参考https://github.com/AliwareMQ/aliware-kafka-demos/tree/master/kafka-python-demo

producer = KafkaProducer(bootstrap_servers=[\'kafka-ons-internet.aliyun.com:8080\'],
                        sasl_mechanism="PLAIN",
                        ssl_context=context,
                        security_protocol=\'SASL_SSL\',
                        api_version = (0,10),
                        retries=5,
                        sasl_plain_username=\'xx\',
                        sasl_plain_password=\'1234567890\'#注意是access-key的最后十位)

partitions = producer.partitions_for(\'xxx\')
print (\'Topic下分区: %s\' % partitions)

try:
    future = producer.send(\'xxx\', b\'hello aliyun-kafka!\')
    future.get()
    print(\'send message succeed.\')
except KafkaError as e:
    print(\'send message failed.\')
    print(e)

 

 

 

 

从阿里云控台获得连接信息

 

 

分类:

技术点:

相关文章:

  • 2021-04-23
  • 2022-01-03
  • 2021-09-17
  • 2022-12-23
  • 2021-08-02
  • 2021-12-18
  • 2022-12-23
  • 2022-12-23
猜你喜欢
  • 2022-12-23
  • 2021-08-12
  • 2021-11-24
  • 2022-03-06
  • 2022-12-23
  • 2022-12-23
相关资源
相似解决方案