【发布时间】:2015-03-29 00:39:04
【问题描述】:
我遇到了 reactor 2.0 版本的问题。也就是说,我正在尝试建立一个反应式信号流,将信号扇出到等待线程池中。我对 Rx 和 Reactive Cocoa 非常熟悉,但这里缺少一些基本的东西。
我有一个基本的转换如下:
WorkQueueDispatcher dispatcher = new WorkQueueDispatcher("dispatch", 10, 64, {... Exception handle code here ...}
return objectStream
.partition(partitions)
.dispatchOn(dispatcher)
.merge()
.map(new Function<Object, Object>() {
@Override
public Object apply(Object o) {
try {
return extension.eval(o, null);
} catch (UnableToEvaluateException e) {
e.printStackTrace();
return null;
}
}
});
我已经尝试了大约七八种不同的方式,包括不同的调度程序等。我尝试将它分解为一个分组的事件流,分别处理每个元素,然后写入一个单独的流进行处理。在每种情况下,我要么在同一个线程上看到每个请求处理(它有效,而不是多线程),要么我收到我开始害怕的错误消息:
java.lang.IllegalStateException: Dispatcher provided doesn't support event ordering. For concurrent signal dispatching, refer to #partition()/groupBy() method and assign individual single dispatchers.
at reactor.core.support.Assert.state(Assert.java:387)
at reactor.rx.Stream.dispatchOn(Stream.java:720)
at reactor.rx.Stream.dispatchOn(Stream.java:650)
我尝试了以下方法:
- 手动进行分区/分组。
- 为前面的步骤明确设置单独的单线程调度程序(环)。
- 只是说 eff 它,没有功能,只是转储到我自己的队列中进行处理。
我在这里缺少什么?我不应该使用广播器来启动消息循环吗?我真的一点都不关心这里的订单执行。
(已编辑)
以下是我正在使用自己开发的横向扩展代码:
objectStream
.consume(new Consumer<Object>() {
@Override
public void accept(Object o) {
final Object target = o;
tpe.execute(new Runnable(){
/**
* When an object implementing interface <code>Runnable</code> is used
* to create a thread, starting the thread causes the object's
* <code>run</code> method to be called in that separately executing
* thread.
* <p/>
* The general contract of the method <code>run</code> is that it may
* take any action whatsoever.
*
* @see Thread#run()
*/
@Override
public void run() {
try {
//System.out.println("On thread "+ Thread.currentThread().getName());
Timer.Context onNext = onNextTimer.time();
Timer.Context timer = callComponentTimer.time();
Object translated = extension.eval(target, null);
timer.close();
broadcaster.onNext(translated);
onNext.close();
} catch (UnableToEvaluateException e) {
e.printStackTrace();
}
}
});
编辑
好的,我更新如下:
MetricRegistry reg = DMPContext.getContext().getMetricRegistry();
de.init(null);
ConsoleReporter reporter = ConsoleReporter.forRegistry(DMPContext.getContext().getMetricRegistry())
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.build();
reporter.start(10, TimeUnit.SECONDS);
final CountDownLatch latch = new CountDownLatch(COUNT);
final Function<String, Object> translator = JSON.from(Request.class);
String content = new String(Files.readAllBytes(Paths.get("/svn/DMPidea/Request.json")));
Broadcaster<String> stringBroadcaster = Broadcaster.create();
final Exec exec = new Exec();
stringBroadcaster
.partition(10)
.consume(new Consumer<GroupedStream<Integer, String>>() {
@Override
public void accept(GroupedStream<Integer, String> groupedStream) {
groupedStream.dispatchOn(Environment.cachedDispatcher()).map(translator).map(new Function<Object, Object>() {
@Override
public Object apply(Object o) {
try {
System.out.println("Got thread " +Thread.currentThread().getName());
return de.eval(o, null);
} catch (UnableToEvaluateException e) {
e.printStackTrace();
return null;
}
}
}).consume(new Consumer<Object>() {
@Override
public void accept(Object o) {
latch.countDown();
}
});
}
});
for (int i=0; i<COUNT; i++)
{
stringBroadcaster.onNext(content);
}
latch.await();
我仍然看到单线程执行:
得到线程 dispatcherGroup-1 得到线程 dispatcherGroup-1 得到线程 dispatcherGroup-1 得到线程 dispatcherGroup-1 得到线程 dispatcherGroup-1 得到线程 dispatcherGroup-1 得到线程 dispatcherGroup-1
【问题讨论】:
标签: java multithreading reactive-programming reactor