【发布时间】:2018-12-09 23:51:36
【问题描述】:
我能够将数据从 Kafka 推送到 Memsql。
我正在尝试使用 Transform 进行推送。我在 Python 中创建了 Kafka Consumer,它使用来自 Kafka Topic 的数据并转换为 Json 格式。
我不知道如何在 Memsql 中将其用作 Transform。
from confluent_kafka import KafkaError
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import SerializerError
import sys
c = AvroConsumer({
'bootstrap.servers': 'X.Y.Z.W:9092',
'group.id': 'groupid1112',
'schema.registry.url': 'http://X.Y.Z.W:8081',
'default.topic.config': {
'auto.offset.reset': 'smallest'
}
})
c.subscribe(['test_topic'])
count =0
while True:
try:
msg = c.poll(10)
except SerializerError as e:
print("Message deserialization failed for {}: {}".format(msg, e))
break
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print(msg.error())
break
valueList = list(msg.value().values())
print(valueList)
c.close()
正在打印
[1518776144187, 1, 2, 103,'asas',asas'eer',None]
【问题讨论】:
-
试试
print(msg.value()),否则MemSQL有JDBC Driver,可以配合JDBC Kafka Connect读取Avro数据写入MemSQL
标签: python apache-kafka singlestore confluent-platform confluent-schema-registry