【问题标题】:How to change the event time in apache beam?如何更改 apache Beam 中的事件时间?
【发布时间】:2019-09-19 15:10:22
【问题描述】:

我正在通过 Pub/sub 摄取数据(json 文件) 所以我的 event-time 默认是在主题中发布的时间。 我想强制事件时间并改变它。 我在数据中添加了一个日期时间字段。
我想根据我的json文件的新时间戳字段进行聚合和组合。

Ps:该字段名为“timestamp”,是一个字符串。这就是为什么我将其转换为日期时间,然后是数据流中的时间戳

def get_timestamp(data):
    my_date = (data['timestamp']) # date : 2010-09-18......string
    times = datetime.fromisoformat(my_date) #type: datetime.datetime
    return beam.window.TimestampedValue(data, datetime.timestamp(times))

稍后我将在执行窗口操作之前调用管道中的函数

我从 pubsub 收到我的数据:

lines = p | 'receive_data' >> beam.io.ReadFromPubSub(
        subscription=known_args.in_topic).with_input_types(str) 
        | 'decode' >> beam.Map(lambda x: x.decode('utf-8')) 
        | 'jsonload' >> beam.Map(lambda x: json.loads(x))

然后做我的处理:

 (lines |'timestamp' >> beam.Map(get_timestamp)
           | 'print timestamp' >> beam.ParDo(PrintFn2())
           | 'window' >> beam.WindowInto(
            window.FixedWindows(10),
            trigger=trigger.AfterWatermark(),
            accumulation_mode=trigger.AccumulationMode.ACCUMULATING
        )
        | 'CountGlobally' >> beam.CombineGlobally(
                beam.combiners.CountCombineFn()
            ).without_defaults() 
    )

【问题讨论】:

  • 代码乍一看是有效的。您对此有什么具体问题吗?
  • 每次打印(就在'CountGlobally'步骤之后,我得到的结果不止一个!通常如果我使用后水印触发器+累积模式,一旦水印到达我应该只有一个结果窗口的尽头!但我得到的不止一个。

标签: python timestamp apache-beam dataflow


【解决方案1】:

从 PubSub 读取时,为元素设置 EvenTime 的最佳方法是使用

JavawithTimestampAttribute

Python timestamp_attribute

这将设置元素时间戳并确保watermark signals 具有良好的数据。

如果这不是一个选项,您可以按照Adding Timestamps to a PCollection 更改 DoFn 中元素的时间戳。但是,此方法不允许设置时间戳

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2020-01-18
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-10-05
    • 2022-11-17
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多