kafka端
consumer vpc版代码
import socket from kafka import KafkaConsumer from kafka.errors import KafkaError # context.check_hostname = True consumer = KafkaConsumer(bootstrap_servers=[\'192.168.xx.xx:9092\'], group_id=\'xx\', api_version = (0,10) ) print(\'consumer start to consuming...\') consumer.subscribe((\'xx\',)) for message in consumer: print(message.topic) print(message.offset) print(message.key) print(message.value) print(message.partition)
producer vpc版代码
#!/usr/bin/env python # encoding: utf-8 import socket from kafka import KafkaProducer from kafka.errors import KafkaError producer = KafkaProducer(bootstrap_servers=[\'192.168.xx.xx:9092\'], api_version = (0,10), retries=5) partitions = producer.partitions_for(\'xx\') print(\'Topic下分区: %s\' % partitions) try: future = producer.send(topic=\'xx\', value=b\'hello aliyun-kafka!\') future.get() print(\'send message succeed.\') except KafkaError as e: print(\'send message failed.\') print(e)
consumer公网版代码
import ssl import socket from kafka import KafkaConsumer from kafka.errors import KafkaError context = ssl.create_default_context() context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) context.verify_mode = ssl.CERT_REQUIRED # context.check_hostname = True context.load_verify_locations("/tmp/ca-cert") consumer = KafkaConsumer(bootstrap_servers=[\'kafka-ons-internet.aliyun.com:8080\'], group_id=\'xxx\', sasl_mechanism="PLAIN", ssl_context=context, security_protocol=\'SASL_SSL\', api_version = (0,10), sasl_plain_username=\'xxx\', sasl_plain_password=\'1234567890\') print(\'consumer start to consuming...\') consumer.subscribe((\'xxx\', )) for message in consumer: print(message.topic) print(message.offset) print(message.value) break
producer 公网版代码
#!/usr/bin/env python # encoding: utf-8 import ssl import socket from kafka import KafkaProducer from kafka.errors import KafkaError context = ssl.create_default_context() context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) context.verify_mode = ssl.CERT_REQUIRED # context.check_hostname = True context.load_verify_locations("/tmp/ca-cert") #这个文件参考https://github.com/AliwareMQ/aliware-kafka-demos/tree/master/kafka-python-demo producer = KafkaProducer(bootstrap_servers=[\'kafka-ons-internet.aliyun.com:8080\'], sasl_mechanism="PLAIN", ssl_context=context, security_protocol=\'SASL_SSL\', api_version = (0,10), retries=5, sasl_plain_username=\'xx\', sasl_plain_password=\'1234567890\'#注意是access-key的最后十位) partitions = producer.partitions_for(\'xxx\') print (\'Topic下分区: %s\' % partitions) try: future = producer.send(\'xxx\', b\'hello aliyun-kafka!\') future.get() print(\'send message succeed.\') except KafkaError as e: print(\'send message failed.\') print(e)
从阿里云控台获得连接信息