【发布时间】:2018-10-10 16:37:38
【问题描述】:
我正在通过ReadFromPubSub 和timestamp_attribute=None 阅读消息,这应该将时间戳设置为发布时间。
这样,我最终得到了 PCollection 的 PubsubMessage 元素。
如何按顺序访问这些元素的时间戳,例如将它们保存到数据库中?我能看到的唯一属性是data 和attributes,而attributes 只有来自 Pub/Sub 的键。
编辑: 示例代码
with beam.Pipeline(options=pipeline_options) as p:
items = (p
| ReadFromPubSub(topic=args.read_topic, with_attributes=True)
| beam.WindowInto(beam.window.FixedWindows(args.time_window))
| 'FormatMessage' >> beam.Map(format_message)
| 'WriteRaw' >> WriteToBigQuery(args.raw_table, args.dataset,
args.project, write_disposition='WRITE_APPEND')
)
format_message 将采用 PubsubMessage 并返回表示要附加到表的行的字典:
def format_message(message):
formatted_message = {
'data': base64.b64encode(message.data),
'attributes': str(message.attributes)
}
return formatted_message
【问题讨论】:
-
我不熟悉python SDK,在java中你可以通过ProcessContext时间戳访问发布时间。如果您可以编辑您的问题并显示您用于获取数据和属性的代码,也许我可以帮助您找到一个等价物。
-
完成。我尝试在python sdk中寻找ProcessContext,但似乎没有
标签: python google-cloud-dataflow apache-beam