【发布时间】:2020-07-21 17:23:04
【问题描述】:
我想为具有早期触发的流式传输管道创建一个单元测试。示例管道如下所示:
class CalculateTeamScores(beam.PTransform):
def expand(self, scores):
return scores \
| "windowing scores" >> beam.WindowInto(
beam.window.FixedWindows(120),
trigger=AfterWatermark(early=AfterCount(1)),
accumulation_mode=AccumulationMode.ACCUMULATING,
allowed_lateness=Duration(seconds=3600)) \
| "preparing scores for combining" >> beam.Map(
lambda team_score: (team_score['team'], team_score['score'])) \
| "calculating team scores" >> beam.CombinePerKey(sum) \
| "forming the result" >> beam.ParDo(FormatResult())
我写了以下测试:
class TestCalculateTeamScores(TestCase):
def test_should_sum_score_for_each_team(self):
# given
p = TestPipeline()
scores_stream = p | "loading score stream" >> TestStream()\
.advance_processing_time(advance_by=timestamp_from_datetime('12:30:00', as_int=True))\
.advance_watermark_to(new_watermark=timestamp_from_datetime('12:00:00'))\
.add_elements(elements=[team_score('red', 5)],
event_timestamp=timestamp_from_datetime('12:00:30'))\
.advance_processing_time(advance_by=500)\
.add_elements(elements=[team_score('red', 9)],
event_timestamp=timestamp_from_datetime('12:01:50'))\
# when
result = scores_stream | 'calculating team scores' >> CalculateTeamScores()
# then
assert_that(result, equal_to([
{
'team': 'red',
'score': 5,
'eventTime': (timestamp_from_datetime('12:00:00'),
timestamp_from_datetime('12:02:00'))
},
{
'team': 'red',
'score': 14,
'eventTime': (timestamp_from_datetime('12:00:00'),
timestamp_from_datetime('12:02:00'))
},
]))
p.run()
如您所见,我希望得到一个分数 = 5 的早期窗格。不幸的是,我无法得到我想要的结果。我得到了 2 个得分 = 14 的窗格。
我使用 TestStream 类,它没有被记录并且不是公共接口的一部分,但它看起来很适合我的情况。
【问题讨论】:
标签: python apache-beam