【问题标题】:Combining Flux entry with previous and map将 Flux entry 与 previous 和 map 结合
【发布时间】:2018-12-18 16:28:12
【问题描述】:

问题

使用 Flux 如何访问前一个元素?

背景

我有一个外部事件流,它按顺序提供事件,该流的顺序是分派一个事件,然后立即分派另一个事件。然而,第二个事件的元数据在第一个事件中。

请注意,事件并不总是偶数。

我想要做的是将这些事件组合成一个事件流以供下游消费。

Flux#zip 看起来很有希望,但这意味着返回一个外部事件类型的对象。

初始代码

到目前为止,我得到的是。

    BinaryLogClient client = new BinaryLogClient(host, port, username, password);
    Flux<Event> bridge = Flux.create(sink -> {
        EventListener fluxListener = event -> {
            sink.next(event);
        };

        client.registerEventListener(fluxListener);
    });

    bridge.subscribe(DemoApplication::printEvent);
    bridge.map(new EventPairMemorizer());


public class EventPair  {
    private final Event previous;
    private final Event current;

    public EventPair(Event previous, Event current) {
        this.previous = previous;
        this.current = current;
    }

    /**
     * @return `null` if no previous events.
     */
    public Event getPrevious() {
        return previous;
    }

    public Event getCurrent() {
        return current;
    }
}

/**
 * Not thread safe has to go on a single thread
 */
public class EventPairMemorizer implements Function<Event, EventPair> {
    Event previous = null;

    EventPair toPair(Event e) {
        EventPair pair = new EventPair(previous, e);
        previous = e;
        return pair;
    }

    @Override
    public EventPair apply(Event current) {
        return toPair(current);
    }
}

这部分是学习练习,部分是概念验证。

不相关的细节

我正在尝试使用 mysql-binlog-connector-java 来获取有关数据库中更改内容的流。

因此,如果我收到 EXT_WRITE_ROWS 事件,则前一个事件是 TABLE_MAP 事件。然后我想对TABLE_MAP 事件进行列查找(使用jdbc)。然后转换为一些对 JSON 友好的内部结构。

这同样适用于EXT_UPDATE_ROWS 事件。

所以想法代码看起来像

  1. onExternalEvent 推送到 Flux
  2. 检查事件类型。如果在 jdbc 线程上使用 Mono 匹配调用 jdbc
  3. 结合单声道和当前事件。
  4. 映射到内部类型。
  5. 发射到不同的流。
  6. 利润

【问题讨论】:

  • 注意,当需要访问序列中过去的元素时,反应式编程不一定是最合适的:尽管有一些方法可以实现,但对于简单问题,它们可能比命令式解决方案更迂回
  • @SimonBaslé 同意,但这部分是一种学习练习。我过去用 vert.x 做过一些编码,但这次有所不同,我正在尝试了解它是如何完成的。在我看来,有效连接到事件流、转换和提供另一个事件流的想法似乎是 Ractor 的目的,但我可能会弄错。

标签: java project-reactor


【解决方案1】:

重叠缓冲区怎么样?

使用buffer(2, 1),您将为每个元素打开一个缓冲区,每个缓冲区将包含 2 个元素。

然后,您可以忽略不以您感兴趣的事件结束的缓冲区,并获取您感兴趣的事件的先前值...

【讨论】:

  • 这看起来很合理并且确实有效。我不喜欢 Flux> 的界面,因为它看起来比实际可接受的更通用。然而,这似乎也是一个很好的权衡。
【解决方案2】:

您可以使用.scan()

Flux<EventPair> pairs = bridge.scan(new EventPair(null,null),(prevPair,newEvent)->
  new EventPair(prevPair.current,newEvent)
);

【讨论】:

    猜你喜欢
    • 2020-05-04
    • 2019-09-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多