【问题标题】:How do I conditionally buffer a Grouped Observable/Flux based on Emitted Events?如何根据发出的事件有条件地缓冲分组的 Observable/Flux?
【发布时间】:2017-10-11 04:56:19
【问题描述】:

我正在尝试根据以下信息编写响应式流:

我们有一个实体事件流,其中每个事件都包含其实体的 ID 和一个 INTENT 或 COMMIT 类型。假设具有给定 ID 的 COMMIT 总是前面有一个或多个具有相同 ID 的 INTENT。当收到一个 INTENT 时,它应该按其 ID 分组,并且应该为该组打开一个“缓冲区”。当接收到同一组的 COMMIT 或配置的超时已过期时,应“关闭”缓冲区。应发出生成的缓冲区。

请注意,在收到关闭 COMMIT 之前,可能会收到多个 INTENT。 (编辑:)bufferDuration 应该保证在bufferDuration 自收到打开缓冲区的 INTENT 后,无论是否有 COMMIT,都会发出任何“打开的”缓冲区。

我最近的尝试如下:

public EntityEventBufferFactory {
    private final Duration bufferDuration;

    public EntityEventBufferFactory(Duration bufferDuration) {
        this.bufferDuration = bufferDuration;
    }

    public Flux<List<EntityEvent>> createGroupBufferFlux(Flux<EntityEvent> eventFlux) {
        return eventFlux.groupBy(EntityEvent::getId)
            .map(groupedFlux -> createGroupBuffer(groupedFlux))
            .flatMap(Function.identity());
    }

    protected Flux<List<EntityEvent>> createGroupBuffer(Flux<EntityEvent> groupFlux) {
        return groupFlux.publish().buffer(groupFlux.filter(this::shouldOpenBufferOnEvent), createGroupBufferCloseSelector(groupFlux));
    }

    protected Function<EntityEvent, Publisher<EntityEvent>> createGroupBufferCloseSelector(Flux<EntityEvent> groupFlux) {
        return event -> Flux.firstEmitting(Flux.just(event).delay(bufferDuration), groupFlux.filter(this::shouldCloseBufferOnEvent).publish());
    }

    protected boolean shouldOpenBufferOnEvent(EntityEvent entityEvent) {
        return entityEvent.getEventType() == EventType.INTENT;
    }

    protected boolean shouldCloseBufferOnEvent(EntityEvent entityEvent) {
        return entityEvent.getEventType() == EventType.COMMIT;
    }
}

这是我试图通过的测试:

@Test
public void entityEventsCanBeBuffered() throws Exception {
    FluxProcessor<EntityEvent, EntityEvent> eventQueue = UnicastProcessor.create();

    Duration bufferDuration = Duration.ofMillis(250);

    Flux<List<EntityEvent>> bufferFlux = new EntityEventBufferFactory(bufferDuration).createGroupBufferFlux(eventQueue);
    bufferFactory.setBufferDuration(bufferDuration);

    List<List<EntityEvent>> buffers = new ArrayList<>();
    bufferFlux.subscribe(buffers::add);

    EntityEvent intent = new EntityEvent();
    intent.setId("SOME_ID");
    intent.setEventType(EventType.INTENT);

    EntityEvent commit = new EntityEvent();
    commit.setId(intent.getId());
    commit.setEventType(EventType.COMMIT);

    eventQueue.onNext(intent);
    eventQueue.onNext(commit);

    eventQueue.onNext(intent);
    eventQueue.onNext(commit);

    Thread.sleep(500);

    assertEquals(2, buffers.size());
    assertFalse(buffers.get(0).isEmpty());
    assertFalse(buffers.get(1).isEmpty());
}

通过这个测试,我得到了两个发出的缓冲区,但它们都是空的。你会注意到,在挖掘之后,我不得不在某些点添加.publish(),以免从反应堆中得到一个异常,说This processor allows only a single Subscriber。这个问题的答案RxJava: "java.lang.IllegalStateException: Only one subscriber allowed!" 是我采用这种方法的原因。

我目前正在使用 Reactor,但我认为这可以使用 Observable 和同名方法与 RxJava 进行一对一的转换。

有什么想法吗?

【问题讨论】:

    标签: java rx-java reactive-programming rx-java2 project-reactor


    【解决方案1】:

    我认为这是 Rx groupBy 的最终用例。来自文档:

    根据指定的标准对发布者发出的项目进行分组,并将这些分组的项目作为 GroupedFlowables 发出。发出的 GroupedPublisher 在其生命周期内仅允许单个订阅者,如果此订阅者在源终止之前取消,则具有相同键的源的下一次发送将触发新的 GroupedPublisher 发送。

    在您的情况下,此标准是 ID,并且在每个 GroupedPublisher 向您发出 takeUntil 时,类型是 COMMIT:

    source
    .groupBy(EntityEvent::getId)
    .flatMap(group -> 
        group
        .takeUntil(Flowable.timer(10,TimeUnit.SECONDS))
        .takeUntil(this::shouldCloseBufferOnEvent)
        .toList())
    

    编辑:添加时间条件。

    【讨论】:

    • 这很接近,但缺少的是在经过的缓冲区持续时间内发射缓冲区。我将编辑我的问题以使其更清楚,但我需要在bufferDuration 自收到第一个 INTENT(打开缓冲区)以来已失效后发出任何给定的缓冲区(假设未收到 COMMIT)
    • 简单:只需添加另一个终止条件 - 我已经通过示例编辑了答案。
    • 非常感谢您!我知道我要么丢失了一条信息,要么正在以不完全正确的方式考虑缓冲过程。事实证明,两者兼而有之。我会将适用于我的 Reactor 代码 sn-p 添加到此答案中。
    【解决方案2】:

    感谢 Tassos Bassoukos 提供的意见。以下 Reactor 代码适用于我:

    public EntityEventBufferFactory {
        private final Duration bufferDuration;
    
        public EntityEventBufferFactory(Duration bufferDuration) {
            this.bufferDuration = bufferDuration;
        }
    
        @Override
        public Flux<List<EntityEvent>> create(Flux<EntityEvent> eventFlux) {
            return eventFlux.groupBy(EntityEvent::getId)
                .map(this::createGroupBuffer)
                .flatMap(Function.identity());
        }
    
        protected Mono<List<EntityEvent>> createGroupBuffer(Flux<EntityEvent> groupFlux) {
            return groupFlux.take(bufferDuration)
                .takeUntil(this::shouldCloseBufferOnEvent)
                .collectList();
        }
    
        protected boolean shouldCloseBufferOnEvent(EntityEvent EntityEvent) {
            return EntityEvent.getEventType() == EventType.COMMIT;
        }
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-12-04
      • 1970-01-01
      • 1970-01-01
      • 2022-01-10
      相关资源
      最近更新 更多