【发布时间】:2021-10-18 14:48:13
【问题描述】:
我正在尝试使用 apache Beam 在 30 秒内将消息从 kafka 消费者 流式传输到谷歌云存储 windows。使用 beam_nuggets.io 读取 kafka 主题。但是,我无法为每个窗口写入唯一的 parquet 文件到 GCS。 你可以在下面看到我的代码:`
import apache_beam as beam
from apache_beam.transforms.trigger import AfterAny, AfterCount, AfterProcessingTime, AfterWatermark, Repeatedly
from apache_beam.portability.api.beam_runner_api_pb2 import AccumulationMode
from apache_beam.options.pipeline_options import PipelineOptions
from beam_nuggets.io import kafkaio
import json
from datetime import datetime
import pandas as pd
import config as conf
import apache_beam.transforms.window as window
consumer_config = {"topic": "Uswrite",
"bootstrap_servers": "*.*.*.*:9092",
"group_id": "notification_consumer_group_33"}
folder_name = datetime.now().strftime('%Y-%m-%d')
def format_result(consume_message):
data = json.loads(consume_message[1])
file_name = datetime.now().strftime("%Y_%m_%d-%I_%M_%S")
df = pd.DataFrame(data).T #, orient='index'
df.to_parquet(f'gs://{conf.gcs}/{folder_name}/{file_name}.parquet',
storage_options={"token": "gcp.json"}, engine='fastparquet')
print(consume_message)
with beam.Pipeline(options=PipelineOptions()) as p:
consumer_message = (p | "Reading messages from Kafka" >> kafkaio.KafkaConsume(consumer_config=consumer_config)
| 'Windowing' >> beam.WindowInto(window.FixedWindows(30),
trigger=AfterProcessingTime(30),
allowed_lateness=900,
accumulation_mode=AccumulationMode.ACCUMULATING)
| 'CombineGlobally' >> beam.Map(format_result))
# window.FixedWindows(30),trigger=beam.transforms.trigger.AfterProcessingTime(30),
# accumulation_mode=beam.transforms.trigger.AccumulationMode.DISCARDING
# allowed_lateness=100,CombineGlobally(format_result).without_defaults() allowed_lateness=30,
使用上面的代码,为每条消息生成一个新的 parquet 文件。我想做的是按 30 秒的窗口对消息进行分组,并为每个窗口生成一个镶木地板文件。 我在下面尝试了不同的配置,但没有成功: beam.CombineGlobally(format_result).without_defaults()) 而不是 beam.Map(format_result)) 光束.ParDo(format_result)) 另外,我还有几个问题:
- 即使我通过“auto.offset.reset”设置偏移量:“最早”, 即使我更改,kafka 生产者也会从最后一条消息开始读取 消费者群体,不知道为什么。
- 另外,我对trigger、allowed_lateness、accumulation_mode的用法感到困惑。 我不确定我是否需要它们来完成这项任务。
正如您在代码块中看到的 上面,我也尝试使用这些参数,但没有帮助。
我到处搜索,但找不到一个解释此用例的示例。 `
【问题讨论】:
标签: python-3.x apache-kafka google-cloud-storage apache-beam software-design