【问题标题】:accessing Kafka metadata with Python KafkaConsumer使用 Python KafkaConsumer 访问 Kafka 元数据
【发布时间】:2018-07-26 01:59:11
【问题描述】:

我有一个简单的 Kafka 阅读器类。我真的不记得我从哪里得到这个代码。可能已经找到它,或者我以前的自我可能是从各种示例中创建的。无论哪种方式,它都可以让我快速阅读 kafka 主题。

class KafkaStreamReader():
def __init__(self, schema_name, topic, server_list):
    self.schema = get_schema(schema_name)
    self.topic = topic
    self.server_list = server_list
    self.consumer =  KafkaConsumer(topic, bootstrap_servers=server_list,
                                   auto_offset_reset = 'latest',
                                   security_protocol="PLAINTEXT")


def decode(self, msg, schema):
    parsed_schema = avro.schema.parse(schema)
    bytes_reader = io.BytesIO(msg)
    decoder = avro.io.BinaryDecoder(bytes_reader)
    reader = avro.io.DatumReader(parsed_schema)
    record = reader.read(decoder)
    return record

def fetch_msg(self):
    event = next(self.consumer).value
    record = self.decode(event, self.schema)
    return record

为了使用它,我实例化一个对象并循环读取数据,如下所示:

consumer = KafkaStreamReader(schema, topic,  server_list)
while True:
    message = consumer.fetch_msg()
    print message

我确信有更好的解决方案,但这对我有用。

我想从中得到的是 Kafka 记录上的元数据。另一个小组的同事使用 Java 或 Node,并且能够在记录中看到以下信息。

{ 
  topic: 'clickstream-v2.origin.test',
  value:      
  {
    schema:payload_data/jsonschema/1-0-3',
    data: [ [Object] ] },
    offset: 16,
    partition: 0,
    highWaterOffset: 17,
    key: null,
    timestamp: 2018-07-25T17:01:36.959Z 
  }
}

我想使用 Python KafkaConsumer 访问时间戳字段。

【问题讨论】:

    标签: python metadata kafka-consumer-api


    【解决方案1】:

    我有一个解决方案。如果我更改 fetch_msg 方法,我可以弄清楚如何访问它。

    def fetch_msg(self):
        event = next(self.consumer)
        timestamp = event.timestamp
        record = self.decode(event.value, self.schema)
        return record, timestamp
    

    不是最优雅的解决方案,因为我个人不喜欢返回多个值的方法。但是,它说明了如何访问我所追求的事件数据。我可以研究更优雅的解决方案

    【讨论】:

      猜你喜欢
      • 2020-02-09
      • 2010-09-05
      • 1970-01-01
      • 1970-01-01
      • 2018-01-24
      • 2020-08-17
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多