【问题标题】:Beam Kafka Streaming Input, No Output to print or textBeam Kafka 流输入,无输出打印或文本
【发布时间】:2021-07-07 02:22:17
【问题描述】:

我正在尝试使用直接运行器来计算 kafka 消息密钥。

如果我将 max_num_records =20 放入 ReadFromKafka,我可以看到打印或输出为文本的结果。 喜欢:

('2102', 5)
('2706', 5)
('2103', 5)
('2707', 5)

但是如果没有max_num_records,或者如果max_num_records大于kafka topic中的消息数,程序会继续运行但没有输出。 如果我尝试使用 beam.io.WriteToText 进行输出,则会创建一个空的临时文件夹,例如: 光束温度-StatOut-d16768eadec511eb8bd897b012f36e97

终端展示:

2.30.0: Pulling from apache/beam_java8_sdk
Digest: sha256:720144b98d9cb2bcb21c2c0741d693b2ed54f85181dbd9963ba0aa9653072c19
Status: Image is up to date for apache/beam_java8_sdk:2.30.0
docker.io/apache/beam_java8_sdk:2.30.0

如果我将 'enable.auto.commit': 'true' 放入 kafka 消费者配置中,则消息已提交,同一组中的其他客户端无法读取它们,所以我认为它正在成功读取,只是没有处理或输出。

我尝试了固定时间,滑动时间窗口,有或没有不同的触发器,没有任何变化。

试过flink runner,结果和direct runner一样。

不知道我做错了什么,有什么帮助吗?

环境: centos 7

蟒蛇

python 3.8.8

java 1.8.0_292

光束 2.30

代码如下:

direct_options = PipelineOptions([
    "--runner=DirectRunner",
    "--environment_type=LOOPBACK",
    "--streaming",
])
direct_options.view_as(SetupOptions).save_main_session = True
direct_options.view_as(StandardOptions).streaming = True

conf = {'bootstrap.servers': '192.168.75.158:9092',
        'group.id': "g17",
        'enable.auto.commit': 'false',
        'auto.offset.reset': 'earliest'}

if __name__ == '__main__':
    with beam.Pipeline(options = direct_options) as p:
        msg_kv_bytes = ( p
            | 'ReadKafka' >> ReadFromKafka(consumer_config=conf,topics=['LaneIn']))
        messages = msg_kv_bytes | 'Decode' >> beam.MapTuple(lambda k, v: (k.decode('utf-8'), v.decode('utf-8')))
        counts = (
            messages
            | beam.WindowInto(
                window.FixedWindows(10),
                trigger = AfterCount(1),#AfterCount(4),#AfterProcessingTime
                # allowed_lateness=3,
                accumulation_mode = AccumulationMode.ACCUMULATING) #ACCUMULATING #DISCARDING
            # | 'Windowsing' >> beam.WindowInto(window.FixedWindows(10, 5))
            | 'TakeKeyPairWithOne' >> beam.MapTuple(lambda k, v: (k, 1))
            | 'Grouping' >> beam.GroupByKey()
            | 'Sum' >> beam.MapTuple(lambda k, v: (k, sum(v)))
        )
        output = (
            counts
            | 'Print' >> beam.ParDo(print)
            # |  'WriteText' >> beam.io.WriteToText('/home/StatOut',file_name_suffix='.txt')
        )

【问题讨论】:

    标签: python apache-beam apache-beam-kafkaio


    【解决方案1】:

    您可能会遇到几个已知问题。 Beam 的便携式DirectRunner 目前不完全支持流式传输。要关注的相关 Jira 是 https://issues.apache.org/jira/browse/BEAM-7514 Beam 的便携式运行器(包括DirectRunner)存在一个已知问题,即流源无法正确发出消息。因此,必须提供max_num_recordsmax_read_time 参数才能将此类源转换为有界源。要关注的相关 Jira 是 https://issues.apache.org/jira/browse/BEAM-11998

    【讨论】:

    • 正如我在帖子中所说,我也尝试了 flink runner,得到了与直接 runner 相同的结果。 flink runner 目前和 direct runner 有同样的问题吗?
    • @CannonFodder 您还需要设置experiments=["use_deprecated_read"] 以及max_rum_recordsmax_read_time 中的至少一个
    猜你喜欢
    • 2017-01-12
    • 1970-01-01
    • 2011-12-19
    • 1970-01-01
    • 2021-07-05
    • 2019-11-29
    • 2022-06-23
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多