【问题标题】:How to create Transform in Memsql when source is Kafka Avro Format当源是 Kafka Avro 格式时如何在 Memsql 中创建转换
【发布时间】:2018-12-09 23:51:36
【问题描述】:

我能够将数据从 Kafka 推送到 Memsql。

我正在尝试使用 Transform 进行推送。我在 Python 中创建了 Kafka Consumer,它使用来自 Kafka Topic 的数据并转换为 Json 格式。

我不知道如何在 Memsql 中将其用作 Transform。

from confluent_kafka import KafkaError
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import SerializerError
import sys

c = AvroConsumer({
    'bootstrap.servers': 'X.Y.Z.W:9092',
    'group.id': 'groupid1112',
    'schema.registry.url': 'http://X.Y.Z.W:8081',
    'default.topic.config': {
        'auto.offset.reset': 'smallest'
    }
    })

c.subscribe(['test_topic'])
count =0
while True:
    try:
        msg = c.poll(10)

    except SerializerError as e:
        print("Message deserialization failed for {}: {}".format(msg, e))
        break

    if msg is None:
        continue

    if msg.error():
        if msg.error().code() == KafkaError._PARTITION_EOF:
            continue
        else:
            print(msg.error())
            break
    valueList = list(msg.value().values())
    print(valueList)

c.close()

正在打印

[1518776144187, 1, 2, 103,'asas',asas'eer',None]

【问题讨论】:

  • 试试print(msg.value()),否则MemSQL有JDBC Driver,可以配合JDBC Kafka Connect读取Avro数据写入MemSQL

标签: python apache-kafka singlestore confluent-platform confluent-schema-registry


【解决方案1】:

检查这些文档 https://docs.memsql.com/memsql-pipelines/v6.0/transforms/

敬请期待即将发布的 MemSQL 版本中的原生 avro 支持。

你会想要做类似下面的事情,但是因为我不知道我头顶上的 avro 库,所以和我一起勾勒出 avro 特定的细节。

```

def input_stream():
    """
        Consume STDIN and yield each record that is received from MemSQL
    """
    while True:
        byte_len = sys.stdin.read(8)
        if len(byte_len) == 8:
            byte_len = struct.unpack("L", byte_len)[0]
            result = sys.stdin.read(byte_len)
            yield result
        else:
            assert len(byte_len) == 0, byte_len
            return

avro_context = WhateverYouNeed() # maybe connect to schema registry here if you need to

for msg in input_stream():
    object = DeserializeAvro(avro_context, msg) # this is your code
    sys.stdout.write(SerializeToTSV(object)) # also your code

```

使用模式注册表应该没问题,但您不必担心在转换脚本中从 kafka 读取的细节。我可以在星期一尝试为您提供更详细的脚本,但这是构建代码的方式。

【讨论】:

  • 我已经浏览了 docs man 但没有办法在管道中配置注册表,所以我只问了如何在 avro 案例中使用转换。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2014-06-22
  • 2020-10-12
  • 2020-06-10
  • 1970-01-01
  • 1970-01-01
  • 2018-05-31
  • 1970-01-01
相关资源
最近更新 更多