【发布时间】:2019-12-27 14:09:23
【问题描述】:
所以,我没有想法。 我会在我的 localpc 中使用位于 minikube 上的生产者(用 python 和 kafka-python 库编写)到达一个 kafka 集群。
生产者代码是:
byte_log = str.encode(f"many stuff")
try:
producer = KafkaProducer(bootstrap_servers=['local-ip:9092'])
future = producer.send('flask.logs', byte_log)
record_metadata = future.get(timeout=10)
print(f"record_metadata.topic {record_metadata.topic}")
print(f"record_metadata.partition {record_metadata.partition} ")
print(f"record_metadata.offset {record_metadata.offset}")
except Exception as e:
print("[KAFKA-P] bad post")
raise e
我尝试了另一种方法来创建生产者并发送消息:
producer = KafkaProducer(bootstrap_servers=['0.0.0.0:9092'], value_serializer=lambda v: json.dumps(v).encode('utf-8'))
并发送
future = producer.send('flask.logs', json.dumps(log))
两个代码的错误是一样的:
kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Timeout after waiting for 10 secs.
另外生产者挂载的容器时区相同,那么时间戳没有问题(可能)。
为了到达 kafka,我创建了一个端点和一个服务,用于映射 localhost 端口 9092 和 pod 的端口 9092 服务部署是(注意没有选择器标签):
kind: Service
apiVersion: v1
metadata:
name: local-ip
spec:
ports:
- protocol: TCP
port: 9092
targetPort: 9092
type: ClusterIP
端点是:
kind: Endpoints
apiVersion: v1
metadata:
name: local-ip
subsets:
- addresses:
- ip: 192.168.99.101
ports:
- port: 9092
可能是 Kube-DNS 的问题? 如果是,我怎样才能找到它??
【问题讨论】:
标签: python apache-kafka minikube