【发布时间】:2018-12-18 16:28:12
【问题描述】:
问题
使用 Flux 如何访问前一个元素?
背景
我有一个外部事件流,它按顺序提供事件,该流的顺序是分派一个事件,然后立即分派另一个事件。然而,第二个事件的元数据在第一个事件中。
请注意,事件并不总是偶数。
我想要做的是将这些事件组合成一个事件流以供下游消费。
Flux#zip 看起来很有希望,但这意味着返回一个外部事件类型的对象。
初始代码
到目前为止,我得到的是。
BinaryLogClient client = new BinaryLogClient(host, port, username, password);
Flux<Event> bridge = Flux.create(sink -> {
EventListener fluxListener = event -> {
sink.next(event);
};
client.registerEventListener(fluxListener);
});
bridge.subscribe(DemoApplication::printEvent);
bridge.map(new EventPairMemorizer());
public class EventPair {
private final Event previous;
private final Event current;
public EventPair(Event previous, Event current) {
this.previous = previous;
this.current = current;
}
/**
* @return `null` if no previous events.
*/
public Event getPrevious() {
return previous;
}
public Event getCurrent() {
return current;
}
}
/**
* Not thread safe has to go on a single thread
*/
public class EventPairMemorizer implements Function<Event, EventPair> {
Event previous = null;
EventPair toPair(Event e) {
EventPair pair = new EventPair(previous, e);
previous = e;
return pair;
}
@Override
public EventPair apply(Event current) {
return toPair(current);
}
}
这部分是学习练习,部分是概念验证。
不相关的细节
我正在尝试使用 mysql-binlog-connector-java 来获取有关数据库中更改内容的流。
因此,如果我收到 EXT_WRITE_ROWS 事件,则前一个事件是 TABLE_MAP 事件。然后我想对TABLE_MAP 事件进行列查找(使用jdbc)。然后转换为一些对 JSON 友好的内部结构。
这同样适用于EXT_UPDATE_ROWS 事件。
所以想法代码看起来像
- onExternalEvent 推送到 Flux
- 检查事件类型。如果在 jdbc 线程上使用 Mono 匹配调用 jdbc
- 结合单声道和当前事件。
- 映射到内部类型。
- 发射到不同的流。
- 利润
【问题讨论】:
-
注意,当需要访问序列中过去的元素时,反应式编程不一定是最合适的:尽管有一些方法可以实现,但对于简单问题,它们可能比命令式解决方案更迂回
-
@SimonBaslé 同意,但这部分是一种学习练习。我过去用 vert.x 做过一些编码,但这次有所不同,我正在尝试了解它是如何完成的。在我看来,有效连接到事件流、转换和提供另一个事件流的想法似乎是 Ractor 的目的,但我可能会弄错。
标签: java project-reactor