【问题标题】:Write a csv file to a kafka topic将 csv 文件写入 kafka 主题
【发布时间】:2020-09-14 07:25:26
【问题描述】:

我有一个很大的 csv,我想写一个 kafka 主题。

def producer():
    producer = KafkaProducer(bootstrap_servers='mykafka-broker')
    with open('/home/antonis/repos/testfile.csv') as file:
        reader = csv.DictReader(file, delimiter=";")
        for row in reader:
            producer.send(topic='stable_topic', value=row)
            producer.flush()

if __name__ == '__main__':
    producer()

此代码产生错误:

AssertionError: value must be bytes

文件如下:

"timestamp","name","age"
2020-03-01 00:00:01,John,36
2020-03-01 00:00:01,Peter,22

谁能帮我解决这个问题?

【问题讨论】:

    标签: python apache-kafka kafka-producer-api kafka-python pykafka


    【解决方案1】:

    您需要正确序列化您的值。


    以下应该可以解决问题:

    import json  
    
    producer = KafkaProducer(
        bootstrap_servers='mykafka-broker',
        value_serializer=lambda v: json.dumps(v).encode('utf-8')
    )
    

    【讨论】:

      【解决方案2】:

      与其重新发明轮子,不如使用已经存在的非常好的一个:) Kafka Connect,它是 Apache Kafka 的一部分。

      有几个可以从 CSV 读取的连接器,包括 Kafka Connect spooldir(请参阅 example)和 Filepulse

      this talk 中了解有关 Kafka Connect 的更多信息。

      【讨论】:

        猜你喜欢
        • 2019-04-23
        • 1970-01-01
        • 2019-07-12
        • 1970-01-01
        • 1970-01-01
        • 2019-07-30
        • 2021-06-22
        • 2018-09-16
        • 1970-01-01
        相关资源
        最近更新 更多