【问题标题】:CompleteableFuture in a loop contruct in a private Ethereum Blockchain私有以太坊区块链中的循环构造中的 CompletableFuture
【发布时间】:2018-07-06 14:30:51
【问题描述】:

我有一个私有的以太坊区块链,上面有 5 台机器进行挖掘。区块链的大小[区块数]截至目前为300。处理在后端Java上完成。

我需要以异步方式运行以下循环构造。循环的瓶颈是在执行以下命令期间:

EthBlock eb = web3.ethGetBlockByNumber(new DefaultBlockParameterNumber(BigInteger.valueOf(i)), true).send();

该命令还可以返回一个Completablefuture<EthBlock> 对象,方法是用这里给出的supplyAsync() 结束它https://github.com/web3j/web3j#start-sending-requests 只需调用supplyAync().get() 即可删除并行性方面并使其同步运行。

public void businessLogic() throws Exception {
        recentBlocks = new ArrayList<EthBlock.Block>();
        for (long i = 1; i <= 300000; i++) {
            EthBlock eb = web3.ethGetBlockByNumber(new DefaultBlockParameterNumber(BigInteger.valueOf(i)), true).send();
            if (eb == null || eb.getBlock() == null) {
                continue;
            }
            EthBlock.Block block = eb.getBlock();
            recentBlocks.add(block);
        }
    }

我无法掌握将代码转换为 CompleteableFuture 可以操作的方式的机构。目标是将多个对web.ethGetBlockNumber(...).supplyAync() 的调用“分组”到一个集合中,并一次调用它们以更新将由EthBlock 对象(即recentBlocks)填充的数组。

这是我想出的:

public void businessLogic() throws Exception {
    recentBlocks = new ArrayList<EthBlock.Block>();
    List<CompleteableFuture> compFutures = new ArrayList<>();
    for (long i = 0, i <= 300000, i++){
        CompleteableFuture<EthBlock> compFuture = eb3.ethGetBlockByNumber(new DefaultBlockParameterNumber(BigInteger.valueOf(i)), true).sendAsync();
        compFuture.thenAcceptAsync(eb -> // Doesn't look right
        EthBlock.Block block = eb.getBlock();
        recentBlock.add(block);)
        compFutures.add(compFuture);        
    }
    CompleteableFuture.allOf(compFutures).get();
}

实现 IntStream

    long start = System.nanoTime();
    recentBlocks = IntStream.rangeClosed(0, 300_000)
             .parallel()
             .mapToObj(i -> {
                try {
                    System.out.println("Current Thread -> " + Thread.currentThread());
                    return web3.ethGetBlockByNumber(new DefaultBlockParameterNumber(BigInteger.valueOf(i)), true).send();
                } catch (IOException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                return null;
            })
             .filter(Objects::nonNull)
             .map(EthBlock::getBlock)
             .filter(Objects::nonNull)
             .collect(Collectors.toList());
    long stop = System.nanoTime();
    System.out.println("Time Elapsed: " + TimeUnit.MICROSECONDS.convert(stop-start, TimeUnit.NANOSECONDS));

【问题讨论】:

    标签: java concurrency blockchain ethereum


    【解决方案1】:

    假设生成的List 的顺序并不重要,您也许可以从并行流中受益,而不是依赖CompletableFuture

    IntStream.rangeClosed(0, 300_000)
             .parallel()
             .mapToObj(i -> web3.ethGetBlockByNumber(new DefaultBlockParameterNumber(BigInteger.valueOf(i)), true).send())
             .filter(Objects::nonNull)
             .map(EthBlock::getBlock)
             .filter(Objects::nonNull)
             .collect(Collectors.toList());
    

    因为你说这没有帮助,让我们尝试使用缓存线程池的ExecutorService

    List<EthBlock.Block> blocks = Collections.synchronizedList(new ArrayList<>(300_000));
    
    ExecutorService service = Executors.newCachedThreadPool();
    
    for (int i = 0; i <= 300_000; i++) {
        BigInteger number = BigInteger.valueOf(i);
    
        service.execute(() -> {
            EthBlock eb = web3.ethGetBlockByNumber(new DefaultBlockParameterNumber(number), true).send();
    
            if (eb == null) {
                return;
            }
    
            EthBlock.Block block = eb.getBlock();
    
            if (block != null) {
                blocks.add(block);
            }
        });
    }
    

    【讨论】:

    • 我运行了您的代码并做了一些基准标记,以便将其与简单的 for 循环代码进行比较。奇怪的是,它并没有给我带来任何性能提升,这两种技术都需要 11 秒才能完成 50 次迭代 [部分范围为 300_000]。在我的Thread.currentThread() 中,它为每个IntStream 实例打印[http-nio-8080-exec-54,5,main] [在maptoObj 中给出。有没有办法强制它实例化可用于检索对象的最大线程数?
    • @user280930 查看我刚刚发布的编辑;希望这会有所帮助。
    • 谢谢!新代码是异步执行的,比以前的代码要快得多。 Executors... 完成 50 次调用 0.4 秒,比 11 秒有了巨大改进。奇怪的是,当我运行IntStream 方法时,它现在也在异步工作,System.currentThread() 打印出ForkJoinPool.commonPool-worker-1,5,main [唯一]。我不知道为什么它突然异步运行。很想知道为什么。它也没有在 1.4 秒内完成执行那么快。最后,你知道为什么我可以使用返回CompletableFuture&lt;EthBlock&gt;web3.eth..sendAsync 吗?
    • 我相信IntStream 解决方案只使用有限数量的线程,而ExecutorService 解决方案每个任务使用一个线程(并缓存它们以供重复使用)。不过,我不知道您在问为什么可以使用 CompletableFuture
    • 对不起,有一个错字。我的意思是要求web3j 提供给定here 的异步实现。我可以使用它的异步方法来返回CompletableFuture&lt;EthBlock&gt; 对象,而不是使用web3.eth...send()。从你给我的代码来看,它可以工作而且速度很快,但它完全忽略了库提供的 async 实现。难道没有任何可能使用它来执行上面给出的完全相同的任务吗?
    【解决方案2】:

    CompletableFuture 包含 get 的 Override: get(long timeout, TimeUnit unit)。如果它没有在特定时间内返回,您可以使用它通过设置超时来进行轮询。

    【讨论】:

    • 我将超时设置为一分钟 EthBlock eb = web3.ethGetBlockByNumber(new DefaultBlockParameterNumber(BigInteger.valueOf(i)), true).sendAsync().get(60,TimeUnit.SECONDS); 但它不会异步运行。在循环中运行 System.out.println("Thread.currentThread())'; 会打印出 -> Thread[http-nio-8080-exec-10,5,main],一遍又一遍。
    • 给一个超时不会导致它异步运行,你必须通过给一个小的超时来实现这个功能,比如 1 秒。如果超时,请稍后再试。明白了吗?
    • 是的,使用timeout 将允许我至少通过允许宽限期返回结果来获得结果。但是我需要eth.ethGetBlockNumber 的多个实例/线程来利用每个稍后返回结果的优势。一件一件做太慢了。
    猜你喜欢
    • 2018-12-05
    • 2017-08-07
    • 1970-01-01
    • 2019-12-09
    • 2018-08-28
    • 1970-01-01
    • 2018-11-03
    • 2017-10-28
    • 1970-01-01
    相关资源
    最近更新 更多