【问题标题】:How to unit test early triggering in Apache Beam (Python SDK)如何在 Apache Beam(Python SDK)中对早期触发进行单元测试
【发布时间】:2020-07-21 17:23:04
【问题描述】:

我想为具有早期触发的流式传输管道创建一个单元测试。示例管道如下所示:

class CalculateTeamScores(beam.PTransform):
    def expand(self, scores):
        return scores \
           | "windowing scores" >> beam.WindowInto(
                beam.window.FixedWindows(120),
                trigger=AfterWatermark(early=AfterCount(1)),
                accumulation_mode=AccumulationMode.ACCUMULATING,
                allowed_lateness=Duration(seconds=3600)) \
           | "preparing scores for combining" >> beam.Map(
                lambda team_score: (team_score['team'], team_score['score'])) \
           | "calculating team scores" >> beam.CombinePerKey(sum) \
           | "forming the result" >> beam.ParDo(FormatResult())

我写了以下测试:

class TestCalculateTeamScores(TestCase):

def test_should_sum_score_for_each_team(self):
    # given
    p = TestPipeline()

    scores_stream = p | "loading score stream" >> TestStream()\
        .advance_processing_time(advance_by=timestamp_from_datetime('12:30:00', as_int=True))\
        .advance_watermark_to(new_watermark=timestamp_from_datetime('12:00:00'))\
        .add_elements(elements=[team_score('red', 5)],
                      event_timestamp=timestamp_from_datetime('12:00:30'))\
        .advance_processing_time(advance_by=500)\
        .add_elements(elements=[team_score('red', 9)],
                      event_timestamp=timestamp_from_datetime('12:01:50'))\

    # when
    result = scores_stream | 'calculating team scores' >> CalculateTeamScores()

    # then
    assert_that(result, equal_to([
        {
            'team': 'red',
            'score': 5,
            'eventTime': (timestamp_from_datetime('12:00:00'),
                          timestamp_from_datetime('12:02:00'))
        },
        {
            'team': 'red',
            'score': 14,
            'eventTime': (timestamp_from_datetime('12:00:00'),
                          timestamp_from_datetime('12:02:00'))
        },
    ]))
    p.run()

如您所见,我希望得到一个分数 = 5 的早期窗格。不幸的是,我无法得到我想要的结果。我得到了 2 个得分 = 14 的窗格。

我使用 TestStream 类,它没有被记录并且不是公共接口的一部分,但它看起来很适合我的情况。

【问题讨论】:

    标签: python apache-beam


    【解决方案1】:

    由于TestPipeline 设置无效,我无法获取早期窗格。应该是:

    options = PipelineOptions()
    options.view_as(StandardOptions).streaming = True
    p = TestPipeline(options=options)
    

    看起来TestStream 可以很好地测试早期触发。

    【讨论】:

      猜你喜欢
      • 2018-05-24
      • 1970-01-01
      • 2019-10-05
      • 2018-05-25
      • 2023-02-02
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2010-12-12
      相关资源
      最近更新 更多