【问题标题】:Writing unique parquet file per windows with Apache Beam Python使用 Apache Beam Python 为每个窗口编写唯一的 parquet 文件
【发布时间】:2021-10-18 14:48:13
【问题描述】:

我正在尝试使用 apache Beam 在 30 秒内将消息从 kafka 消费者 流式传输到谷歌云存储 windows。使用 beam_nuggets.io 读取 kafka 主题。但是,我无法为每个窗口写入唯一的 parquet 文件到 GCS。 你可以在下面看到我的代码:`

import apache_beam as beam
from apache_beam.transforms.trigger import AfterAny, AfterCount, AfterProcessingTime, AfterWatermark, Repeatedly

from apache_beam.portability.api.beam_runner_api_pb2 import AccumulationMode
from apache_beam.options.pipeline_options import PipelineOptions
from beam_nuggets.io import kafkaio
import json
from datetime import datetime
import pandas as pd
import config as conf
import apache_beam.transforms.window as window
consumer_config = {"topic": "Uswrite",
                   "bootstrap_servers": "*.*.*.*:9092",
                   "group_id": "notification_consumer_group_33"}
folder_name = datetime.now().strftime('%Y-%m-%d')
def format_result(consume_message):
    data = json.loads(consume_message[1])
    file_name = datetime.now().strftime("%Y_%m_%d-%I_%M_%S")
    df = pd.DataFrame(data).T #, orient='index'
    df.to_parquet(f'gs://{conf.gcs}/{folder_name}/{file_name}.parquet',
               storage_options={"token": "gcp.json"}, engine='fastparquet')
    print(consume_message)
with beam.Pipeline(options=PipelineOptions()) as p:
    consumer_message = (p | "Reading messages from Kafka" >> kafkaio.KafkaConsume(consumer_config=consumer_config)
                          | 'Windowing' >> beam.WindowInto(window.FixedWindows(30),
                                            trigger=AfterProcessingTime(30),
                                            allowed_lateness=900,
                                           accumulation_mode=AccumulationMode.ACCUMULATING)
                          | 'CombineGlobally' >> beam.Map(format_result))

# window.FixedWindows(30),trigger=beam.transforms.trigger.AfterProcessingTime(30),
# accumulation_mode=beam.transforms.trigger.AccumulationMode.DISCARDING
# allowed_lateness=100,CombineGlobally(format_result).without_defaults() allowed_lateness=30,

使用上面的代码,为每条消息生成一个新的 parquet 文件。我想做的是按 30 秒的窗口对消息进行分组,并为每个窗口生成一个镶木地板文件。 我在下面尝试了不同的配置,但没有成功: beam.CombineGlobally(format_result).without_defaults()) 而不是 beam.Map(format_result)) 光束.ParDo(format_result)) 另外,我还有几个问题:

  1. 即使我通过“auto.offset.reset”设置偏移量:“最早”, 即使我更改,kafka 生产者也会从最后一条消息开始读取 消费者群体,不知道为什么。
  2. 另外,我对trigger、allowed_lateness、accumulation_mode的用法感到困惑。 我不确定我是否需要它们来完成这项任务。

正如您在代码块中看到的 上面,我也尝试使用这些参数,但没有帮助。

我到处搜索,但找不到一个解释此用例的示例。 `

【问题讨论】:

    标签: python-3.x apache-kafka google-cloud-storage apache-beam software-design


    【解决方案1】:

    您应该对管道进行一些更改以获得此结果:

    • 如果您希望每个窗口有一个输出,请删除您的 trigger。只有在每个窗口获得多个结果时才需要触发器。
    • 添加GroupByKeyCombine 操作以聚合元素。没有这样的操作,窗口化是没有效果的。
    • 我建议使用来自 Beam 项目本身的parquetio,以确保您获得可扩展的一次性行为。 (See the pydoc from 2.33.0 release)

    【讨论】:

    • 您好,由于字数限制,我在下面回答了我的问题。感谢您的帮助!
    【解决方案2】:

    我查看了 python 文档中的 GroupByKey 示例

    1. 我从 KafkaConsumer 读取的消息(我使用了来自 beam_nuggets.io) 有一个元组类型,并且为了使用 GroupByKey,我尝试在 convert_to_list 函数中创建一个列表 通过附加我从 Kafka Consumer 获得的元组。然而, GroupByKey 仍然不产生任何输出。
    import apache_beam as beam
    from beam_nuggets.io import kafkaio
    
    new_list = []
    def convert_to_list(consume_message):
        new_list.append(consume_message)
        return new_list
    
    
    with beam.Pipeline() as pipeline:
        dofn_params = (
                pipeline
                | "Reading messages from Kafka" >> kafkaio.KafkaConsume(consumer_config=consumer_config)
                | 'Fixed 30sec windows' >> beam.WindowInto(beam.window.FixedWindows(30))
                | 'consume message added list' >> beam.ParDo(convert_to_list)
                | 'GroupBykey' >> beam.GroupByKey()
                | 'print' >> beam.Map(print))
    
    1. 我也尝试了类似的管道,但这次,我创建了一个列表 使用 beam.Create() 而不是从 kafka 读取的元组,它 工作成功。您可以在下面查看此管道:
    import apache_beam as beam
    from beam_nuggets.io import kafkaio
    
    
    with beam.Pipeline() as pipeline:
        dofn_params = (
                pipeline
                | 'Created Pipeline' >> beam.Create([(None, '{"userId": "921","xx":"123"]),(None, '{"userId": "92111","yy":"123"]))
                | 'Fixed 30sec windows' >> beam.WindowInto(beam.window.FixedWindows(30))
                | 'GroupBykey' >> beam.GroupByKey()
                | 'print' >> beam.Map(print))
    

    我认为第一种方法中的问题与生成外部列表而不是 pcollection 有关,但我不确定。你能指导我如何进行吗?

    我尝试的另一件事是使用 apache_beam.io.kafka 模块中的 ReadFromKafka 函数。但是这次我得到了以下错误:

    ERROR:apache_beam.utils.subprocess_server:Starting job service with ['java', '-jar', 'user_directory’/.apache_beam/cache/jars\\beam-sdks-java-io-expansion-service-2.33.0.jar', '59627']
    

    Java 版本 11.0.12 已安装在我的计算机上,并且“java”命令可用。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2020-01-02
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-12-28
      • 1970-01-01
      • 2016-01-01
      • 1970-01-01
      相关资源
      最近更新 更多