【问题标题】:Apache Beam WriteToKafka (python SDK) doesn't write to topic (no manifest of error)Apache Beam WriteToKafka(python SDK)不写入主题(没有错误清单)
【发布时间】:2022-07-14 02:05:36
【问题描述】:

我正在尝试使用 Apache Beam(python SDK)的 WriteToKafka 类将流写入 Kafka 主题。但是,它会无休止地运行脚本(没有错误)并且不会将流写入主题。我必须取消运行,它不会停止,也不会出错。任何帮助表示赞赏。您可以在下面找到重现问题的最小示例

from typing import Tuple
import os

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.kafka import WriteToKafka

pipeline_options = PipelineOptions(
    runner='FlinkRunner'
)


def convert_to_int(row: str) -> int:
    print(row)
    return int(row)

bootstrap_servers = 'localhost:9092'
topic = 'test'

folder_path = os.path.dirname(__file__)
input_file = os.path.join(folder_path, 'data/test.txt')
serializer = 'org.apache.kafka.common.serialization.LongSerializer'
with beam.Pipeline(options=pipeline_options) as p:

    stream = (p 
        | "left read" >> beam.io.ReadFromText(input_file)
        # | 'With timestamps' >> beam.Map(lambda event: beam.window.TimestampedValue(event, current_timestamp_ms()))
        | 'type cast' >> beam.Map(convert_to_int).with_output_types(int)
        # Kafka write transforms expects KVs.
        | beam.Map(lambda x: (x, x)).with_output_types(Tuple[int, int])
        | 'kafka_write' >> WriteToKafka(
            producer_config={
                'bootstrap.servers': bootstrap_servers
                },
            topic=topic,
            key_serializer=serializer,
            value_serializer=serializer,
            )
        )


data/test.txt 文件包含

1
2
3

顺便说一句我已经仔细检查了主题和生产者配置。

【问题讨论】:

  • 也许是关于这个beam.apache.org/documentation/sdks/…
  • @OneCricketeer 我已编辑问题以包含最小示例。亲切的问候
  • 您是否能够确定哪一步有问题?例如,您可以从文本文件中读取并输出到单独的文本文件吗?可以直接使用 beam.Create 将元素写入 Kafka 吗?此外,如果它作为流式管道而不是批处理运行,它可能需要窗口化,那么您可以尝试向管道添加窗口化吗?

标签: python apache-kafka apache-beam apache-beam-io apache-beam-kafkaio


【解决方案1】:

查看我正在使用的 kafka 流中放入了什么

bin/kafka-console-producer.sh --topic test --bootstrap-server localhost:9092

命令。 bin/kafka-console-producer.sh 假设键和值是字符串,并使用“org.apache.kafka.common.serialization.StringDeserializer”来反序列化记录的数据。由于保存的数据是 Long 格式(python 整数很长)。应该使用下面的命令成功反序列化主题中的数据。

bin/kafka-console-producer.sh --topic test --bootstrap-server localhost:9092 --key-deserializer "org.apache.kafka.common.serialization.LongDeserializer" --value-deserializer "org.apache.kafka.common.serialization.LongDeserializer"

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-07-13
    • 2019-10-05
    • 1970-01-01
    • 1970-01-01
    • 2021-12-20
    • 2020-06-15
    相关资源
    最近更新 更多