【问题标题】:Synchronous SubmissionPublisher同步提交发布者
【发布时间】:2021-02-27 18:27:46
【问题描述】:

是否可以让订阅者与发布者在同一线程上运行(同步)? 我可以使用 CompletableFuture,但它只提供一个结果。但是,如果我需要将许多结果交付给订阅者怎么办。请查看这个小测试以获得更好的解释。

  @Test
  public void testCompletableFutureThreads() throws InterruptedException {
    CompletableFuture<String> f = new CompletableFuture<String>();
    f.thenAccept(new Consumer<String>() {
      @Override
      public void accept(String s) {
        System.out.println("accept " + s + " in " + Thread.currentThread().getName());
      }
    });
    Thread.sleep(200);
    System.out.println("send complete from " + Thread.currentThread().getName());
    f.complete("test");
    Thread.sleep(1000);
  }



  @Test
  public void testSubmissionPublisherThreads() throws InterruptedException {

    SubmissionPublisher<String> publisher = new SubmissionPublisher<String>();

    publisher.subscribe(new Flow.Subscriber<String>() {
      @Override
      public void onSubscribe(Flow.Subscription subscription) {
        subscription.request(Long.MAX_VALUE);
      }

      @Override
      public void onNext(String item) {
        System.out.println("onNext in " + Thread.currentThread().getName() + " received " + item);
      }

      @Override
      public void onError(Throwable throwable) {
        System.err.println("onError in " + Thread.currentThread().getName());
        throwable.printStackTrace(System.err);
      }

      @Override
      public void onComplete() {
        System.err.println("onComplete in " + Thread.currentThread().getName());
      }
    });

    int i = 10;
    while (i-- > 0) {
      Thread.sleep(100);
      System.out.println("publisher from " + Thread.currentThread().getName());
      publisher.submit("" + System.currentTimeMillis());

    }
  }

【问题讨论】:

  • 您是否有一个订阅者和一个发布者,或者可以有多个订阅者和/或发布者?
  • 单个发布者和多个订阅者

标签: java reactive java.util.concurrent completable-future java-flow


【解决方案1】:

您可以使用SubmissionPublisher(Executor, int) 构造函数并提供一个Executor 来运行当前线程上的所有内容:

final SubmissionPublisher<String> publisher = new SubmissionPublisher<>(new Executor() {
    @Override
    public void execute(Runnable command) {
        command.run();
    }
}, Flow.defaultBufferSize());

或者,如果您更喜欢 lambda:

final SubmissionPublisher<String> publisher = new SubmissionPublisher<>(Runnable::run, Flow.defaultBufferSize());

默认构造函数使用ForkJoinPool.commonPool()发送信号。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-04-20
    • 1970-01-01
    • 2022-08-17
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多