【问题标题】:How to properly use a Reactor Publisher如何正确使用 Reactor Publisher
【发布时间】:2016-03-12 16:07:48
【问题描述】:

我无法弄清楚如何使用 Reactor 正确实现发布者/订阅者场景。我有一个可行的解决方案,但实施对我来说似乎不正确:

我的问题是我需要手动实现发布者注册订阅者并传递事件:

public void publishQuotes(String ticker) throws InterruptedException {

// [..] Here I generate some "lines" to be publisher

for (Subscriber<? super String> subscriber : subscribers) {
    lineList.forEach(line -> subscriber.onNext(line));
}

}

@Override
public void subscribe(Subscriber<? super String> subscriber) {
    subscribers.add(subscriber);

}

然后,我有一个工作队列处理器(应该是消费者):

WorkQueueProcessor<String> sink = WorkQueueProcessor.create();

// Here I subscribe to my publiser
publisher.subscribe(sink);

// Creates a Reactive Stream from the processor (having converted the lines to Quotations)
Flux<StockQuotation> mappedRS = sink.map(quotationConverter::convertHistoricalCSVToStockQuotation);

// Here I perform a number of stream transformations 

// Each call to consume will be executed in a separated Thread
filteredRS.consume(i -> System.out.println(Thread.currentThread() + " data=" + i));

它工作正常,但丑得要命。 In this example 取自 Spring Guides,他们使用 EventBus 将事件从发布者路由到消费者,但是,当我尝试将它与我的处理器链接时,我得到了这个编译器错误:

eventBus.on($("quotes"),sink);

The method on(Selector, Consumer<T>) in the type EventBus is not applicable for the arguments (Selector<String>, WorkQueueProcessor<String>)

我在这里迷路了,将发布者与处理器联系起来的最佳方式是什么?你会推荐使用 EventBus 吗?如果是这样,正确的调用是什么?

谢谢!

【问题讨论】:

    标签: java spring reactor project-reactor


    【解决方案1】:

    如果您使用 EventBus,您将通过

    发布您的线路
     eventBus.notify("quotes", Event.wrap(line);
    

    并通过以下方式订阅

    eventBus.on($("quotes"), e -> System.out.println(Thread.currentThread() + " data=" + e);
    

    其中“e”的类型为 Event

    【讨论】:

    • 感谢您的回答 Jan。我对必须订阅的类型感到困惑。如果我使用WorkQueueProcessor&lt;Event&lt;String&gt;&gt; 就可以了。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-06-21
    • 2019-05-04
    相关资源
    最近更新 更多