【发布时间】:2019-09-19 15:10:22
【问题描述】:
我正在通过 Pub/sub 摄取数据(json 文件)
所以我的 event-time 默认是在主题中发布的时间。
我想强制事件时间并改变它。
我在数据中添加了一个日期时间字段。
我想根据我的json文件的新时间戳字段进行聚合和组合。
Ps:该字段名为“timestamp”,是一个字符串。这就是为什么我将其转换为日期时间,然后是数据流中的时间戳
def get_timestamp(data):
my_date = (data['timestamp']) # date : 2010-09-18......string
times = datetime.fromisoformat(my_date) #type: datetime.datetime
return beam.window.TimestampedValue(data, datetime.timestamp(times))
稍后我将在执行窗口操作之前调用管道中的函数
我从 pubsub 收到我的数据:
lines = p | 'receive_data' >> beam.io.ReadFromPubSub(
subscription=known_args.in_topic).with_input_types(str)
| 'decode' >> beam.Map(lambda x: x.decode('utf-8'))
| 'jsonload' >> beam.Map(lambda x: json.loads(x))
然后做我的处理:
(lines |'timestamp' >> beam.Map(get_timestamp)
| 'print timestamp' >> beam.ParDo(PrintFn2())
| 'window' >> beam.WindowInto(
window.FixedWindows(10),
trigger=trigger.AfterWatermark(),
accumulation_mode=trigger.AccumulationMode.ACCUMULATING
)
| 'CountGlobally' >> beam.CombineGlobally(
beam.combiners.CountCombineFn()
).without_defaults()
)
【问题讨论】:
-
代码乍一看是有效的。您对此有什么具体问题吗?
-
每次打印(就在'CountGlobally'步骤之后,我得到的结果不止一个!通常如果我使用后水印触发器+累积模式,一旦水印到达我应该只有一个结果窗口的尽头!但我得到的不止一个。
标签: python timestamp apache-beam dataflow