【问题标题】:CompletableFuture across microservices(JVM)跨微服务 (JVM) 的 CompletableFuture
【发布时间】:2018-09-26 15:52:07
【问题描述】:

第 1 步:我想让一个 CompletableFuture<String> asyncFuture微服务 A 中启动,通过 supplyAsync 运行异步任务。

第 2 步:然后通过从不同的微服务 B 中手动调用 asyncFuture.complete(T value) 手动完成相同的未来对象,这将由一些异步事件触发。

显然微服务A微服务B有不同的JVM。 实际上,微服务 A 和微服务 B 是同一微服务的不同实例,运行在 Kubernetes 的不同 pod 上。

在第 1 步和第 2 步之间,future 对象将存储在 Redis 中,微服务 B 可以安全地检索。


经过一些快速的谷歌搜索后,我想我会尝试以下几个解决方案:

1> HazelCast 的分布式执行器服务,我可以在调用时作为第二个参数传入

static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

参考: http://docs.hazelcast.org/docs/2.3/manual/html/ch09.html

2>使用 apache ignite 中的共享 executorService

参考:https://apacheignite.readme.io/v1.2/docs/executor-service

不确定两者是否可行?另外我想知道以前有没有人处理过这样的事情?如果是这样,如果您能与我分享您的解决方案,我将不胜感激。

【问题讨论】:

  • 我的两分钱:您要求一个 jvm 完成由另一个 jvm 启动的线程。这听起来并不安全
  • 奇怪的是,你会通过supplyAsync() 创建你的未来,然后手动complete() 它。虽然没有严格禁止,但通常的模式是您的Supplier 将提供结果,因此您不需要调用complete()。否则Supplier 的结果将被忽略/丢失。

标签: java redis hazelcast ignite completable-future


【解决方案1】:

关于 Apache Ignite,如何协作节点(微服务)有很多选择。其中之一是连续查询 [1],它允许监听缓存上发生的数据修改。

例如,在服务 A 上,您可以创建 ContinuousQuery 并等待缓存中的值发生变化:

private String waitForValueChanged(IgniteCache<Integer, String> cache, Integer key) throws InterruptedException {
    ContinuousQuery<Integer, String> qry = new ContinuousQuery<>();

    qry.setInitialQuery(new ScanQuery<>((k, v) -> k == key));

    final CountDownLatch waitForValueChanged = new CountDownLatch(1);
    final AtomicReference<String> result = new AtomicReference<>();

    CacheEntryUpdatedListener<Integer, String> listener = new CacheEntryUpdatedListener<Integer, String>() {
        @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends String>> iterable) throws CacheEntryListenerException {
            for (CacheEntryEvent<? extends Integer, ? extends String> entry: iterable) {
                result.set(entry.getValue());
            }

            waitForValueChanged.countDown();
        }
    };

    qry.setLocalListener(listener);

    try (QueryCursor<Cache.Entry<Integer, String>> cur = cache.query(qry);) {
        waitForValueChanged.await(60000, TimeUnit.MILLISECONDS);
    }

    return result.get();
}

在服务 B 上,您只需将值放入缓存中即可“完成未来”:

private void completeFuture(IgniteCache<Integer, String> cache, Integer key, String value) {
    cache.put(key, value);
}

这是一个示例项目,它展示了连续查询如何工作 [2]。

[1]https://apacheignite.readme.io/docs#section-continuous-queries

[2]https://github.com/gromtech/ignite-continuous-query-example

【讨论】:

    猜你喜欢
    • 2015-09-03
    • 2015-07-24
    • 2015-01-07
    • 2017-07-12
    • 2019-05-24
    • 2017-04-15
    • 2019-03-21
    • 2015-06-10
    • 2015-07-24
    相关资源
    最近更新 更多