【问题标题】:Problem with state and timers - apache beam状态和计时器的问题 - apache beam
【发布时间】:2019-07-26 13:46:59
【问题描述】:

我正在尝试重新创建this 博客文章中描述的 apache 光束管道的简单示例,它使用状态和计时器。

我编写了这段代码来尝试测试博文中的内容。代码应该简单地通过在每条记录上附加“:丰富”来丰富字符串记录。在我添加“陈旧”计时器之前,我遇到了一个错误。

我使用的是 apache Beam 2.13 版和直接运行器。

这里是大部分从博客复制粘贴的丰富 DoFn:

public class Enrich extends DoFn<KV<String, String>, String> {
  private static final long serialVersionUID = 1L;

  private static final int MAX_BUFFER_SIZE = 2;

  @StateId("buffer")
  private final StateSpec<BagState<String>> bufferedEvents =
    StateSpecs.bag();

  @StateId("count")
  private final StateSpec<ValueState<Integer>> countState =
    StateSpecs.value();

  @TimerId("expiry")
  private final TimerSpec expirySpec = 
    TimerSpecs.timer(TimeDomain.EVENT_TIME);

  @ProcessElement
  public void process(
    ProcessContext context,
    BoundedWindow window,
    @StateId("buffer") BagState<String> bufferState,
    @StateId("count") ValueState<Integer> countState,
    @TimerId("expiry") Timer expiryTimer) {

      Duration allowedLateness = Duration.standardSeconds(10);

      expiryTimer.set(window.maxTimestamp().plus(allowedLateness));

      int count = firstNonNull(countState.read(), 0);
      count = count + 1;
      countState.write(count);
      bufferState.add(context.element().getValue());

      if (count >= MAX_BUFFER_SIZE) {
        for (String event : bufferState.read()) {
          context.output(enrichEvent(event));
        }
        bufferState.clear();
        countState.clear();
      }
    }

    @OnTimer("expiry")
    public void onExpiry(
      OnTimerContext context,
      @StateId("buffer") BagState<String> bufferState) {

        if (!bufferState.isEmpty().read()) {
          for (String event : bufferState.read()) {
            context.output(enrichEvent(event));
          }
          bufferState.clear();
        }
    }

    public static String enrichEvent(String event) {
      return event + ": enriched";
    }

    public static int firstNonNull(Integer x, Integer y) {
      if (x == null) {
        return y;
      }
      return x;
    }
}

这是我用来测试丰富DoFn的代码:

@RunWith(JUnit4.class)
public class EnrichTest {
  final Logger LOG = LoggerFactory.getLogger(EnrichTest.class);

  @Rule
  public TestPipeline p = TestPipeline.create();

  static final String record1 = "1";
  static final String record2 = "2";
  static final String record3 = "3";

  static final String key = "a key";

  static final String result1 = "1: enriched";
  static final String result2 = "2: enriched";
  static final String result3 = "3: enriched";

  @Test
  public void testSimple() throws Exception {
    Duration ALLOWED_LATENESS = Duration.standardSeconds(10);
    Duration WINDOW_DURATION = Duration.standardSeconds(10);
    Instant baseTime = new Instant(0L);
    KvCoder<String, String> coder = 
      KvCoder.of(AvroCoder.of(String.class), AvroCoder.of(String.class));

    TestStream<KV<String, String>> items = 
        TestStream
          .create(coder)
          .advanceWatermarkTo(baseTime)
          .addElements(
              TimestampedValue.of(
                KV.of(key, record1),
                baseTime.plus(Duration.standardSeconds(1))))
          .addElements(
              TimestampedValue.of(
                KV.of(key, record2),
                baseTime.plus(Duration.standardSeconds(0))))
          .advanceWatermarkTo(
              baseTime.plus(Duration.standardSeconds(11)))
          .addElements(
              TimestampedValue.of(
                KV.of(key, record3),
                baseTime.plus(Duration.standardSeconds(2))))
          .advanceWatermarkToInfinity();

    PCollection<String> results = 
        p.apply(items)
         .apply(new CreateWindows (WINDOW_DURATION, ALLOWED_LATENESS))
         .apply(ParDo.of(new Enrich()));

    PAssert
      .that(results)
      .inWindow(new IntervalWindow(baseTime, WINDOW_DURATION))
      .containsInAnyOrder(result1, result2, result3);

    p.run().waitUntilFinish();
  }
}

这是我的窗口函数:

public class CreateWindows extends 
  PTransform<PCollection<KV<String, String>>,
             PCollection<KV<String, String>>> {

  private static final long serialVersionUID = 1L;
  private final Duration windowDuration;
  private final Duration allowedLateness;

  public CreateStringWindows(Duration windowDuration, Duration allowedLateness) {
    this.windowDuration = windowDuration;
    this.allowedLateness = allowedLateness;
  }

  @Override
  public PCollection<KV<String, String>> expand(
    PCollection<KV<String, String>> items) {

    return items.apply("Aggregate fixed window",
      Window.<KV<String, String>>into(FixedWindows.of(windowDuration))
            .triggering(AfterWatermark.pastEndOfWindow())
            .discardingFiredPanes()
            .withAllowedLateness(allowedLateness));
  }
}

正如我们在上面的代码中看到的,我使用大小为 10 秒的固定窗口。允许的延迟也设置为 10 秒。

您还应该注意到,到期计时器已设置为expiryTimer.set(window.maxTimestamp().plus(allowedLateness));,如博客文章中所述。在我的测试中我按时添加了前2条记录,我将水印移动到11秒,然后添加最后一条记录来测试当我添加延迟数据时会发生什么。

当我运行测试时,我收到以下错误:

java.lang.IllegalStateException: TimestampCombiner moved element from 1970-01-01T00:00:19.999Z to earlier time 1970-01-01T00:00:09.999Z for window [1970-01-01T00:00:00.000Z..1970-01-01T00:00:10.000Z)

我希望这段代码能够处理迟到的数据,特别是因为到期计时器设置为window.maxTimestamp().plus(allowedLateness)

博文没有具体提到它使用的窗口策略。这可能是问题吗?我也尝试使用Never.ever() 作为窗口触发器,但我得到了同样的错误:

.triggering(Never.ever())
.discardingFiredPanes()
.withAllowedLateness(allowedLateness));

抱歉,帖子太长了,我们将不胜感激。

【问题讨论】:

    标签: timer state apache-beam


    【解决方案1】:

    最后,我将onExpiry 编辑为使用context.outputWithTimestamp(enrichEvent(event), window.maxTimestamp()); 而不是context.output(enrichEvent(event));。这样就解决了问题。

    这是更正后的onExpiry 方法。

        @OnTimer("expiry")
        public void onExpiry(
          OnTimerContext context, BoundedWindow window,
          @StateId("buffer") BagState<String> bufferState) {
    
            if (!bufferState.isEmpty().read()) {
              for (String event : bufferState.read()) {
                context.outputWithTimestamp(enrichEvent(event), window.maxTimestamp());
              }
              bufferState.clear();
            }
        }
    

    【讨论】:

      猜你喜欢
      • 2021-08-14
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-08-13
      相关资源
      最近更新 更多