【问题标题】:Is there a notion of virtual time in Apache Flink tests like there is in Reactor and RxJavaApache Flink 测试中是否有像 Reactor 和 RxJava 中那样的虚拟时间概念
【发布时间】:2019-02-24 18:55:15
【问题描述】:

在 RxJava 和 Reactor 中有虚拟时间的概念来测试依赖于时间的操作符。我不知道如何在 Flink 中做到这一点。例如,我整理了以下示例,我想在其中玩弄迟到的事件以了解它们是如何处理的。但是我无法理解这样的测试会是什么样子?有没有办法将 Flink 和 Reactor 结合起来让测试变得更好?

public class PlayWithFlink {

    public static void main(String[] args) throws Exception {

        final OutputTag<MyEvent> lateOutputTag = new OutputTag<MyEvent>("late-data"){};

        // TODO understand how BoundedOutOfOrderness is related to allowedLateness
        BoundedOutOfOrdernessTimestampExtractor<MyEvent> eventTimeFunction = new BoundedOutOfOrdernessTimestampExtractor<MyEvent>(Time.seconds(10)) {
            @Override
            public long extractTimestamp(MyEvent element) {
                return element.getEventTime();
            }
        };

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStream<MyEvent> events = env.fromCollection(MyEvent.examples())
                .assignTimestampsAndWatermarks(eventTimeFunction);

        AggregateFunction<MyEvent, MyAggregate, MyAggregate> aggregateFn = new AggregateFunction<MyEvent, MyAggregate, MyAggregate>() {
            @Override
            public MyAggregate createAccumulator() {
                return new MyAggregate();
            }

            @Override
            public MyAggregate add(MyEvent myEvent, MyAggregate myAggregate) {
                if (myEvent.getTracingId().equals("trace1")) {
                    myAggregate.getTrace1().add(myEvent);
                    return myAggregate;
                }
                myAggregate.getTrace2().add(myEvent);
                return myAggregate;
            }

            @Override
            public MyAggregate getResult(MyAggregate myAggregate) {
                return myAggregate;
            }

            @Override
            public MyAggregate merge(MyAggregate myAggregate, MyAggregate acc1) {
                acc1.getTrace1().addAll(myAggregate.getTrace1());
                acc1.getTrace2().addAll(myAggregate.getTrace2());
                return acc1;
            }
        };

        KeySelector<MyEvent, String> keyFn = new KeySelector<MyEvent, String>() {
            @Override
            public String getKey(MyEvent myEvent) throws Exception {
                return myEvent.getTracingId();
            }
        };

        SingleOutputStreamOperator<MyAggregate> result = events
                .keyBy(keyFn)
                .window(EventTimeSessionWindows.withGap(Time.seconds(10)))
                .allowedLateness(Time.seconds(20))
                .sideOutputLateData(lateOutputTag)
                .aggregate(aggregateFn);


        DataStream lateStream = result.getSideOutput(lateOutputTag);

        result.print("SessionData");

        lateStream.print("LateData");

        env.execute();
    }
}

class MyEvent {
    private final String tracingId;
    private final Integer count;
    private final long eventTime;

    public MyEvent(String tracingId, Integer count, long eventTime) {
        this.tracingId = tracingId;
        this.count = count;
        this.eventTime = eventTime;
    }

    public String getTracingId() {
        return tracingId;
    }

    public Integer getCount() {
        return count;
    }

    public long getEventTime() {
        return eventTime;
    }

    public static List<MyEvent> examples() {
        long now = System.currentTimeMillis();
        MyEvent e1 = new MyEvent("trace1", 1, now);
        MyEvent e2 = new MyEvent("trace2", 1, now);
        MyEvent e3 = new MyEvent("trace2", 1, now - 1000);
        MyEvent e4 = new MyEvent("trace1", 1, now - 200);
        MyEvent e5 = new MyEvent("trace1", 1, now - 50000);
        return Arrays.asList(e1,e2,e3,e4, e5);
    }

    @Override
    public String toString() {
        return "MyEvent{" +
                "tracingId='" + tracingId + '\'' +
                ", count=" + count +
                ", eventTime=" + eventTime +
                '}';
    }
}

class MyAggregate {
    private final List<MyEvent> trace1 = new ArrayList<>();
    private final List<MyEvent> trace2 = new ArrayList<>();


    public List<MyEvent> getTrace1() {
        return trace1;
    }

    public List<MyEvent> getTrace2() {
        return trace2;
    }

    @Override
    public String toString() {
        return "MyAggregate{" +
                "trace1=" + trace1 +
                ", trace2=" + trace2 +
                '}';
    }
}

运行这个的输出是:

SessionData:1> MyAggregate{trace1=[], trace2=[MyEvent{tracingId='trace2', count=1, eventTime=1551034666081}, MyEvent{tracingId='trace2', count=1, eventTime=1551034665081}]}
SessionData:3> MyAggregate{trace1=[MyEvent{tracingId='trace1', count=1, eventTime=1551034166081}], trace2=[]}
SessionData:3> MyAggregate{trace1=[MyEvent{tracingId='trace1', count=1, eventTime=1551034666081}, MyEvent{tracingId='trace1', count=1, eventTime=1551034665881}], trace2=[]}

但是我希望看到 e5 事件的 lateStream 触发器应该在第一个事件触发前 50 秒。

【问题讨论】:

    标签: java rx-java apache-flink flink-streaming project-reactor


    【解决方案1】:

    如果你修改你的水印分配器是这样的

    AssignerWithPunctuatedWatermarks eventTimeFunction = new AssignerWithPunctuatedWatermarks<MyEvent>() {
        long maxTs = 0;
    
        @Override
        public long extractTimestamp(MyEvent myEvent, long l) {
            long ts = myEvent.getEventTime();
            if (ts > maxTs) {
                maxTs = ts;
            }
            return ts;
        }
    
        @Override
        public Watermark checkAndGetNextWatermark(MyEvent event, long extractedTimestamp) {
            return new Watermark(maxTs - 10000);
        }
    };
    

    那么你会得到你期望的结果。我不建议这样做 - 只是用它来说明正在发生的事情。

    这里发生的情况是 BoundedOutOfOrdernessTimestampExtractor 是一个周期性水印生成器,它只会每 200 毫秒(默认情况下)将水印插入到流中。因为你的工作在那之前很久就完成了,你的工作遇到的唯一水印是 Flink 在每个有限流结束时注入的水印(值为 MAX_WATERMARK)。迟到与水印有关,您预计迟到的事件设法在该水印之前到达。

    通过切换到带标点的水印,您可以强制水印更频繁地出现,或更精确地出现在流中的特定点。这通常是不必要的(并且过于频繁的水印会导致开销),但是当您想要对水印的顺序进行强有力的控制时会很有帮助。

    关于如何编写测试,你可以看看 Flink 自己的测试中使用的test harnesses,或者flink-spector

    更新:

    与 BoundedOutOfOrdernessTimestampExtractor 关联的时间间隔是对流的预期乱序程度的规范。在此范围内到达的事件不会被视为迟到,并且事件时间计时器在此延迟过去之前不会触发,从而为乱序事件提供时间到达。 allowedLateness 仅适用于窗口 API,并描述了框架保持窗口状态的正常窗口触发时间多长时间,以便仍然可以将事件添加到窗口并导致延迟触发。在这个额外的时间间隔之后,窗口状态被清除,后续事件被发送到侧面输出(如果配置)。

    因此,当您使用BoundedOutOfOrdernessTimestampExtractor&lt;MyEvent&gt;(Time.seconds(10)) 时,您不会说“在每个事件之后等待 10 秒,以防更早的事件可能仍然到达”。但是您是说您的事件最多应该有 10 秒的乱序。因此,如果您正在处理实时的事件流,这意味着您将等待最多 10 秒,以防更早的事件到达。 (如果您正在处理历史数据,那么您可能能够在 1 秒内处理 10 秒的数据,或者不能 - 知道您将等待 n 秒的事件时间过去并没有说明实际需要多长时间。 )

    有关此主题的更多信息,请参阅Event Time and Watermarks

    【讨论】:

    • 好的,所以没有办法模拟水印的时间,如果能够完全控制水印的生成方式,那就太好了。例如,我想编写测试只是为了测试 BoundedOutOfOrdernessTimestampExtractor。我想知道的是它如何影响流中迟到的概念。例如,如果我有 10 秒的 BoundedOutOfOrdernessTimestampExtractor 和 10 秒的 allowedLateness,那么迟到的消息会是什么?是不是最后一个时间戳后10秒后20秒到达?
    • 测试工具和 flink-spector 提供了控制水印以进行测试的方法,但可能不是您想要的方式。延迟与水印有关——allowedLateness 为 10 秒将允许处理时间戳最多落后(小于)当前水印 10 秒的记录。
    • 是的,但 BoundedOutOfOrdernessTimestampExtractor 还指定了我不理解的消息可以延迟多长时间的时间。我在此处指定的时间如何影响 allowedLateness?
    • 我已经更新了我的回复;希望这让事情变得更清楚了。
    • 感谢您的患者,但仍然没有得到它。如果 BoundedOutOfOrdernessTimestampExtractor 时间比允许的延迟时间长得多,会发生什么?例如,如果您将 BoundedOutOfOrdernessTimestampExtractor 设置为 50 秒并且 allowedLateness 设置为 10 秒,我们将永远不会等待 50 秒?我认为我错过了一些关键的见解来理解这一点。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-09-04
    • 2012-09-12
    • 1970-01-01
    • 2016-11-14
    • 2011-11-27
    相关资源
    最近更新 更多