【问题标题】:Multithreading in Reactor 2.0 - why can't I spin signals out to multiple threadsReactor 2.0 中的多线程 - 为什么我不能将信号发送到多个线程
【发布时间】:2015-03-29 00:39:04
【问题描述】:

我遇到了 reactor 2.0 版本的问题。也就是说,我正在尝试建立一个反应式信号流,将信号扇出到等待线程池中。我对 Rx 和 Reactive Cocoa 非常熟悉,但这里缺少一些基本的东西。

我有一个基本的转换如下:

 WorkQueueDispatcher dispatcher = new WorkQueueDispatcher("dispatch", 10, 64, {... Exception handle code here ...}


return objectStream
            .partition(partitions)
            .dispatchOn(dispatcher)
            .merge()
            .map(new Function<Object, Object>() {
                @Override
                public Object apply(Object o) {
                    try {
                        return extension.eval(o, null);
                    } catch (UnableToEvaluateException e) {
                        e.printStackTrace();
                        return null;
                    }

                }
            });

我已经尝试了大约七八种不同的方式,包括不同的调度程序等。我尝试将它分解为一个分组的事件流,分别处理每个元素,然后写入一个单独的流进行处理。在每种情况下,我要么在同一个线程上看到每个请求处理(它有效,而不是多线程),要么我收到我开始害怕的错误消息:

   java.lang.IllegalStateException: Dispatcher provided doesn't support event     ordering.  For concurrent signal dispatching, refer to #partition()/groupBy()     method and assign individual single dispatchers. 
at reactor.core.support.Assert.state(Assert.java:387)
at reactor.rx.Stream.dispatchOn(Stream.java:720)
at reactor.rx.Stream.dispatchOn(Stream.java:650)

我尝试了以下方法:

  1. 手动进行分区/分组。
  2. 为前面的步骤明确设置单独的单线程调度程序(环)。
  3. 只是说 eff 它,没有功能,只是转储到我自己的队列中进行处理。

我在这里缺少什么?我不应该使用广播器来启动消息循环吗?我真的一点都不关心这里的订单执行。

(已编辑)

以下是我正在使用自己开发的横向扩展代码:

objectStream
        .consume(new Consumer<Object>() {
            @Override
            public void accept(Object o) {
                final Object target = o;
                tpe.execute(new Runnable(){
                    /**
                     * When an object implementing interface <code>Runnable</code> is used
                     * to create a thread, starting the thread causes the object's
                     * <code>run</code> method to be called in that separately executing
                     * thread.
                     * <p/>
                     * The general contract of the method <code>run</code> is that it may
                     * take any action whatsoever.
                     *
                     * @see Thread#run()
                     */
                    @Override
                    public void run() {
                        try {
                            //System.out.println("On thread "+ Thread.currentThread().getName());
                            Timer.Context onNext = onNextTimer.time();
                            Timer.Context timer = callComponentTimer.time();
                            Object translated = extension.eval(target, null);
                            timer.close();
                            broadcaster.onNext(translated);
                            onNext.close();
                        } catch (UnableToEvaluateException e) {
                            e.printStackTrace();
                        }
                    }
                });

编辑

好的,我更新如下:

 MetricRegistry reg = DMPContext.getContext().getMetricRegistry();


    de.init(null);


    ConsoleReporter reporter = ConsoleReporter.forRegistry(DMPContext.getContext().getMetricRegistry())
            .convertRatesTo(TimeUnit.SECONDS)
            .convertDurationsTo(TimeUnit.MILLISECONDS)
            .build();

    reporter.start(10, TimeUnit.SECONDS);

    final CountDownLatch latch = new CountDownLatch(COUNT);

    final Function<String, Object> translator = JSON.from(Request.class);
    String content = new String(Files.readAllBytes(Paths.get("/svn/DMPidea/Request.json")));

    Broadcaster<String> stringBroadcaster = Broadcaster.create();

    final Exec exec = new Exec();


    stringBroadcaster
            .partition(10)
            .consume(new Consumer<GroupedStream<Integer, String>>() {
                @Override
                public void accept(GroupedStream<Integer, String> groupedStream) {

                    groupedStream.dispatchOn(Environment.cachedDispatcher()).map(translator).map(new Function<Object, Object>() {
                        @Override
                        public Object apply(Object o) {
                            try {
                                System.out.println("Got thread " +Thread.currentThread().getName());
                                return de.eval(o, null);
                            } catch (UnableToEvaluateException e) {
                                e.printStackTrace();
                                return null;
                            }
                        }
                    }).consume(new Consumer<Object>() {
                        @Override
                        public void accept(Object o) {
                            latch.countDown();
                        }
                    });
                }
            });

    for (int i=0; i<COUNT; i++)
    {

        stringBroadcaster.onNext(content);

    }
    latch.await();

我仍然看到单线程执行:

得到线程 dispatcherGroup-1 得到线程 dispatcherGroup-1 得到线程 dispatcherGroup-1 得到线程 dispatcherGroup-1 得到线程 dispatcherGroup-1 得到线程 dispatcherGroup-1 得到线程 dispatcherGroup-1

【问题讨论】:

    标签: java multithreading reactive-programming reactor


    【解决方案1】:

    Reactor2.0 中的模式是使用单独的CachedDispatchers (RingBufferDispatchers) 而不是使用WorkQueueDispatcherThreadPoolExecutorDispatcher。这提供了将流拆分为不同的线程。这仅适用于将小型(通常是非阻塞)操作并行化为逻辑流。

    要在单独的线程池中完全非阻塞执行,请参阅我在底部的编辑。

    您将希望使用 Stream.groupBy() 或 Stream.partition() 将流拆分为多个流,每个流都可以在单独的线程上分派。

    1. Stream.groupBy() - 根据您返回的键分桶到流中
    2. Stream.partition([int]) - 根据流对象(信号)的 hashCode 分桶到流中,也可以选择分桶到您指定的桶数

    在您将流分区/分组为单独的流之后,您可以使用flatMap() 告诉您的新流分派到单独的线程上。

    .flatMap(stream -&gt; stream.dispatchOn(Environment.cachedDispatcher())

    调用Environment.cachedDispatcher() 实际上是从它们的池中获取CachedDispatcher (RingBufferDispatcher)。默认情况下,池的大小等于您计算机的处理器数量。 CachedDispatchers 是惰性创建的,因此在您对 .dispatchOn(Dispatcher) 的调用中执行此操作并不理想。

    要预先创建CachedDispatchers 的池(DispatcherSupplier),您可以使用Environment.newCachedDispatchers(int, [groupName])。以下是 ProjectReactor 文档中关于如何组合所有这些的示例:

        DispatcherSupplier supplier1 = Environment.newCachedDispatchers(2, "groupByPool");
        DispatcherSupplier supplier2 = Environment.newCachedDispatchers(5, "partitionPool");
    
        Streams
            .range(1, 10)
            .groupBy(n -> n % 2 == 0) 
            .flatMap(stream -> stream
                    .dispatchOn(supplier1.get()) 
                    .log("groupBy")
            )
            .partition(5) 
            .flatMap(stream -> stream
                    .dispatchOn(supplier2.get()) 
                    .log("partition")
            )
            .dispatchOn(Environment.sharedDispatcher()) 
            .log("join")
            .consume();
    

    Reactor Docs: Partitioning Streams into separate threads

    请注意,在此示例中,他们还在最后一次 flatMap() 调用之后调用了 .dispatchOn(Environment.sharedDispatcher())。这是将流简单地加入到单个Dispatcher 线程中。在本例中为Environment.sharedDispatcher(),即另一个RingBufferDispatcher

    我使用此策略将我的流划分为单独的线程,以在单独的 CachedDispatchers 上并行进行阻塞调用,然后将它们重新加入主 Environment.sharedDispatcher() 上的单个线程中,所有这些都处于非阻塞/反应式方式,像这样:

        // Spring's RestTemplate for making simple REST (HTTP) calls
        RestTemplate restTemplate = new RestTemplate();
        List<String> urls = Arrays.asList("http://www.google.com", "http://www.imgur.com");
        Streams
            .from(urls)
            .groupBy(s -> s) // unique group (stream) per URL
            .flatMap(stream -> 
                // dispatch on separate threads per stream
                stream.dispatchOn(Environment.cachedDispatcher())
                     // blocking call in separate dispatching thread
                     .map(s -> restTemplate.getForObject(s, String.class)))
            // rejoin into single dispatcher thread
            .dispatchOn(Environment.sharedDispatcher())
            .consume(
                 s -> { // consumer
                     System.out.println("---complete in stream---");
                 },
                 t -> { // error consumer
                     System.out.println("---exception---");
                     System.out.println(t);
                 },
                 s -> { // complete consumer
                     latch.countDown();
                     System.out.println("---complete---");
                 });
    



    编辑: 对于并行执行阻塞操作,您可能仍希望使用线程池。在 Reactor 2.0 中,您可以使用由线程池支持的调度程序执行此操作,最好是 Executors.newCachedThreadPool(),因为它会重用线程并限制 GC 压力。

    我发现做到这一点的最简单方法是使用 ThreadPoolExecutorDispatcherExecutors.newCachedThreadPool() 支持的 EventBus(如果您查看函数,这实际上只是带有一些特定设置的 ThreadPoolExecutor创建 newCachedThreadPool):

        // dispatcher backed by cached thread pool for limited GC pressure
        Dispatcher dispatcher = new ThreadPoolExecutorDispatcher(1024, 1024, Executors.newCachedThreadPool());
    
        int n = 1000000;
        EventBus bus = EventBus.create(dispatcher);
        CountDownLatch latch = new CountDownLatch(n);
        bus.on($("topic"), (Event<Integer> ev) -> {
            blockingServiceCall(100); // block for 100ms
            latch.countDown();
            if (ev.getData() % 1000 == 0) {
                System.out.println(Thread.currentThread() + " " + Thread.activeCount());
            }
        });
    
        long start = System.currentTimeMillis();
        for(int i = 0; i < n; i++) {
            bus.notify("topic", Event.wrap(i));
        }
        latch.await();
        System.out.println("ops/sec: " + (n * 1.0) / ((System.currentTimeMillis() - start) / 1000.0) );
    

    如果您想重新使用 Streams,则需要在消费者内部完成阻塞调用后将其分派回 Stream。据我所知,您不能简单地将它们自动合并回一个流中,也不能通过 Stream.dispatchOn(threadPoolExecutorDispatcher) 直接使用由缓存线程池支持的 ThreadPoolExecutorDispatcher。

    【讨论】:

      【解决方案2】:

      只需浏览一下,就会尽快得到正确的答案。 但起初问题不是与您执行合并()然后执行您的逻辑的事实有关吗? Reactive Stream 是每个规范的单线程 onXXX 调用(在 RxJava 和 Akka Stream 中也是如此)。

      就像在 Rx 中一样,我只会在平面映射流中进行处理工作,或者如果您不需要合并回来(如果您考虑的话,这是一个争论点)。

      return objectStream
              .partition(partitions)
              .consume( partitionStream ->
                 partitionStream
                   .dispatchOn(Environment.cachedDispatcher()) 
                   .consume( o -> {
                      try {
                          return extension.eval(o, null);
                      } catch (UnableToEvaluateException e) {
                          e.printStackTrace();
                          return null;
                      }
      
                  }
              ));
      

      【讨论】:

      • WorkQueueDispatcher 在 Rx 中不直接支持,但是 cachedDispatcher 从 N(默认处理器数量)中返回一个池调度程序(默认为 ringBuffer),所以在我过去的示例中,我将分配 N 个不同的 N 个分区调度员,有效地扩大规模。另请注意,2.0 及更高版本有一个特定的非标准组件将取代 WorkQueueDispatcher 超时,RingBufferWorkProcessor 这是一个真正的反应式流组件,可以根据需要扩展任意数量的订阅者。
      • Pred,好吧,那么 dispatchOn 会从 2.0 的池中选择一个独立线程吗?当我打印线程名称时,它总是出现在主线程上,而不是单独的线程。我正在使用通用的静态环境设置。会不会是这个问题?
      • 所以,我测试了它的这个版本,奇怪的是,此时我仍然看到单线程行为。我看到线程说它来自 dispatcherGroup,但 threadId 始终是 dispatcherGroup-1,我没有看到当前创建的任何其他线程。
      【解决方案3】:

      我已经完成了合并和不合并。没有任何区别。这里的代码库是 2.0。一旦我点击了 dispatchOn,我就可以接受它是单线程的,但我需要利用多核(毕竟,这是利用 FRP 的一个重要原因)。我的基准测试没有显示任何线程发生,打印线程的名称也没有显示任何独立线程的扇出。

      请注意,我在这里使用的是 Java 7 而不是 8(而且,伙计,我会在这里杀死 lamda 函数及其语法)。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2023-03-30
        • 2015-10-19
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2016-06-10
        相关资源
        最近更新 更多