【问题标题】:Apache Beam Python SDK: How to access timestamp of an element?Apache Beam Python SDK:如何访问元素的时间戳?
【发布时间】:2018-10-10 16:37:38
【问题描述】:

我正在通过ReadFromPubSubtimestamp_attribute=None 阅读消息,这应该将时间戳设置为发布时间。

这样,我最终得到了 PCollectionPubsubMessage 元素。

如何按顺序访问这些元素的时间戳,例如将它们保存到数据库中?我能看到的唯一属性是dataattributes,而attributes 只有来自 Pub/Sub 的键。

编辑: 示例代码

with beam.Pipeline(options=pipeline_options) as p:
    items = (p
        | ReadFromPubSub(topic=args.read_topic, with_attributes=True)
        | beam.WindowInto(beam.window.FixedWindows(args.time_window))
        | 'FormatMessage' >> beam.Map(format_message)
        | 'WriteRaw' >> WriteToBigQuery(args.raw_table, args.dataset,
            args.project, write_disposition='WRITE_APPEND')
    )

format_message 将采用 PubsubMessage 并返回表示要附加到表的行的字典:

def format_message(message):
    formatted_message = {
        'data': base64.b64encode(message.data),
        'attributes': str(message.attributes)
    }
    return formatted_message

【问题讨论】:

  • 我不熟悉python SDK,在java中你可以通过ProcessContext时间戳访问发布时间。如果您可以编辑您的问题并显示您用于获取数据和属性的代码,也许我可以帮助您找到一个等价物。
  • 完成。我尝试在python sdk中寻找ProcessContext,但似乎没有

标签: python google-cloud-dataflow apache-beam


【解决方案1】:

原来可以修改映射函数以读取其他参数:

def format_message(message, timestamp=beam.DoFn.TimestampParam):    
    formatted_message = {
        'data': base64.b64encode(message.data),
        'attributes': str(message.attributes),
        'timestamp': float(timestamp)
    }

    return formatted_message

更多可能的参数: https://beam.apache.org/releases/pydoc/2.7.0/apache_beam.transforms.core.html#apache_beam.transforms.core.DoFn

【讨论】:

  • 服务添加的“messageId”呢?似乎 DoFnProcessParams 不包含它。
  • 不确定。这是在哪里添加的?
  • 在 Pub/Sub 服务上,与发布时间相同。 cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage
  • 我明白了,可能可以以类似的方式访问。也许通过ElementParam?没看过。
【解决方案2】:

您是否尝试过设置 with_attributes=True?

希望Beam docs 对您有所帮助。参数包括:

with_attributes - True - 输出元素将是 PubsubMessage 对象。默认为 False - 输出元素将是字节类型(消息数据 仅)。

【讨论】:

  • 是的,我正在使用with_attributes=True。但我找不到包含时间戳的PubsubMessage 的属性
【解决方案3】:

当您调用 beam.io.gcp.pubsub.ReadFromPubSub() 时,似乎有(新发布的?!)timestamp_attribute 参数

但我试过了,它没有像我预期的那样工作。如果有人想跟进,请在 SO 上发布新查询DataFlow (PY 2.x SDk) ReadFromPubSub :: id_label & timestamp_attribute behaving unexpectedly

【讨论】:

    猜你喜欢
    • 2019-10-05
    • 1970-01-01
    • 1970-01-01
    • 2022-11-17
    • 2017-02-07
    • 1970-01-01
    • 1970-01-01
    • 2023-04-10
    • 1970-01-01
    相关资源
    最近更新 更多