【问题标题】:How to produce Kafka messages with JSON format in Python如何在 Python 中生成 JSON 格式的 Kafka 消息
【发布时间】:2020-09-20 18:13:59
【问题描述】:

如何删除报价并像原始格式一样发送数据 原始 JSON 格式为:

{
  "@timestamp": "2020-06-02T09:38:03.183186Z"
}

此数据在另一个主题中

"{\"@timestamp\": \"2020-05-25T17:40:47.582778Z\"}"

这是在服务器之间发送数据的代码

def parse(d):   
    if str(type(d)) == "<class 'dict'>":       
        return (r)
    return -1

producer = KafkaProducer(bootstrap_servers=param["BOOTSTRAP_SERVERS"],
                                 value_serializer=lambda x: dumps(x).encode('utf-8'))  # utf-8
consumer = KafkaConsumer(bootstrap_servers=param["BOOTSTRAP_SERVERS"]+'1',
                                 auto_offset_reset=param["AUTO_OFFSET_RESET"],
                                 consumer_timeout_ms=param["CONSUMER_TIMEOUT_MS"],
                                 enable_auto_commit=False,
                                 auto_commit_interval_ms=60000,
                                 group_id=param["GROUP_ID"],
                                 client_id=param["CLIENT_ID"]
                                 )
consumer.subscribe([param["TOPIC_IN"]])
 while True:
      num_rows = 0
      for msg in consumer:
          num_rows = num_rows + 1
          m = json.loads(msg.value)
          j = parse(m)
          if j != -1:
             data = json.dumps(j)
             producer.send(param["TOPIC_OUT"], value=data)

【问题讨论】:

    标签: python apache-kafka kafka-producer-api kafka-python


    【解决方案1】:

    您当前正在将值序列化为字符串。如果你想要 JSON 而不是字符串,那么你需要正确序列化你的值。


    以下应该可以解决问题:

    import json  
    
    producer = KafkaProducer(
        bootstrap_servers='mykafka-broker',
        value_serializer=lambda v: json.dumps(v).encode('utf-8')
    )
    

    【讨论】:

      猜你喜欢
      • 2019-08-27
      • 2021-02-16
      • 2016-08-13
      • 2022-01-02
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多