【问题标题】:How to execute dependent tasks in Java 8 without any blocking如何在 Java 8 中无任何阻塞地执行依赖任务
【发布时间】:2016-05-25 07:11:54
【问题描述】:

不久前我回答了这个问题:Executing Dependent tasks in parallel in Java 但是使用 future.get() 会阻塞当前线程,如果在一度。在 Java 中如何从 futures 中组合 futures?

【问题讨论】:

标签: java multithreading future


【解决方案1】:

我想我会自己回答这个问题,可以在 java 中使用 CompletableFutures 而不是 Futures。 CompletableFutures 允许通过 thenCombine 方法进行组合,该方法类似于 scalas flatMap。现在没有阻塞发生,只需要 3 个线程即可达到最快的时间。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.BiFunction;
import java.util.function.Supplier;

public class Barrista
{

    // number of threads used in executor
    static final int NOTHREADS = 3;

    // time of each task
    static final int HEATWATER = 1000;
    static final int GRINDBEANS = 1000;
    static final int FROTHMILK = 1000;
    static final int BREWING = 1000;
    static final int COMBINE = 1000;

    // method to simulate work (pause current thread without throwing checked exception)
    public static void pause(long t)
    {
        try
        {
            Thread.sleep(t);
        }
        catch(Exception e)
        {
            throw new Error(e.toString());
        }
    }

    // task to heat some water
    static class HeatWater implements Supplier<String>
    {
        @Override
        public String get()
        {
            System.out.println("Heating Water");
            pause(HEATWATER);
            return "hot water";
        }
    }

    // task to grind some beans
    static class GrindBeans implements Supplier<String>
    {
        @Override
        public String get()
        {
            System.out.println("Grinding Beans");
            pause(GRINDBEANS);
            return "grinded beans";
        }
    }

    // task to froth some milk
    static class FrothMilk implements Supplier<String>
    {
        @Override
        public String get()
        {
            System.out.println("Frothing some milk");
            pause(FROTHMILK);
            return "some milk";
        }
    }

    // task to brew some coffee
    static class Brew implements BiFunction<String,String, String>
    {
        @Override
        public String apply(String groundBeans, String heatedWater)
        {
            System.out.println("Brewing coffee with " + groundBeans + " and " + heatedWater);
            pause(BREWING);
            return "brewed coffee";
        }
    }

    // task to combine brewed coffee and milk
    static class Combine implements BiFunction<String,String, String>
    {
        @Override
        public String apply(String frothedMilk, String brewedCoffee)
        {
            System.out.println("Combining " + frothedMilk + " "+ brewedCoffee);
            pause(COMBINE);
            return "Final Coffee";
        }
    }

    public static void main(String[] args)
    {
        ExecutorService executor = Executors.newFixedThreadPool(NOTHREADS);

        long startTime = System.currentTimeMillis();

        try
        {
            // create all the tasks and let the executor handle the execution order
            CompletableFuture<String> frothMilk =       CompletableFuture.supplyAsync(new FrothMilk(), executor);
            CompletableFuture<String> heatWaterFuture = CompletableFuture.supplyAsync(new HeatWater(), executor);
            CompletableFuture<String> grindBeans =      CompletableFuture.supplyAsync(new GrindBeans(), executor);

            CompletableFuture<String> brew = heatWaterFuture.thenCombine(grindBeans, new Brew());
            CompletableFuture<String> coffee =          brew.thenCombine(frothMilk,  new Combine());

            // final coffee
            System.out.println("Here is the coffee:" + coffee.get());

            // analyzing times:
            System.out.println("\n\n");
            System.out.println("Actual time: \t\t\t\t" + (System.currentTimeMillis() - startTime)/1000.0);

            // compute the quickest possible time:
            long path1 = Math.max(GRINDBEANS, HEATWATER)+ BREWING + COMBINE;
            long path2 = FROTHMILK + COMBINE;
            System.out.println("Quickest time multi-threaded:\t\t" + Math.max(path1, path2)/1000.0);

            // compute the longest possible time:
            long longestTime = HEATWATER + GRINDBEANS + FROTHMILK + BREWING + COMBINE;
            System.out.println("Quickest time single-threaded thread:\t" + longestTime/1000.0);
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }
        finally
        {
            executor.shutdown();
        }

    }
}

【讨论】:

  • 这是一个非常好的解决方案,但发现了一个警告:在实践中,作业可能会失败(以状态的形式,例如,成功终止,或因失败而终止)但我仍然希望尽快获得并且在一次调试那些失败的工作之前尽可能完成许多工作。那么这里有什么好的异常处理策略呢?
  • 是的,你是对的@cpchung!解决这个问题的一种方法是使用 Monads,而容器上的映射是一个优雅的解决方案。在 Java 中,您几乎总是有 if(success) 语句来实现相同的目标,尽管通过一些额外的努力,您可以做得更好。
【解决方案2】:

Java 8 引入了 CompletableFuture,您不需要特别阻止 get 调用,除非您根据完成阶段触发回调。

可以显式完成的 Future(设置它的值和 status),并且可以用作 CompletionStage,支持依赖 完成后触发的函数和动作。

阅读更多documentation

在 java 8 之前,这个概念在 google groovy 库中可用,更多信息请参阅 documentation 和 spring 库 too

【讨论】:

  • 谢谢,是的,我在回答中使用了这些。当我问这个问题时,我确实准备好了这个问题的答案,但我认为这对人们有用。我很难找到如何做到这一点。
【解决方案3】:

Dexecutor 在这里进行救援。

免责声明我是所有者

依赖任务执行,用dexecutor轻松完成。

顺序

DefaultDexecutor<Integer, Integer> executor = newTaskExecutor();
//Building
executor.addDependency(1, 2);
executor.addDependency(2, 3);
executor.addDependency(3, 4);
executor.addDependency(4, 5);
//Execution
executor.execute(ExecutionConfig.TERMINATING);

平行

DefaultDexecutor<Integer, Integer> executor = newTaskExecutor();
//Building
executor.addIndependent(1);
executor.addIndependent(2);
executor.addIndependent(3);
executor.addIndependent(4);
//Execution
executor.execute(ExecutionConfig.TERMINATING);

甚至混合

DefaultDexecutor<Integer, Integer> executor = newTaskExecutor();

executor.addDependency(1, 2);
executor.addDependency(1, 2);
executor.addDependency(1, 3);
executor.addDependency(3, 4);
executor.addDependency(3, 5);
executor.addDependency(3, 6);

executor.addDependency(2, 7);
executor.addDependency(2, 9);
executor.addDependency(2, 8);
executor.addDependency(9, 10);
executor.addDependency(12, 13);
executor.addDependency(13, 4);
executor.addDependency(13, 14);
executor.addIndependent(11);

executor.execute(new ExecutionConfig().immediateRetrying(2));

更多详情请参考How Do I

【讨论】:

    猜你喜欢
    • 2023-03-30
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-05-20
    • 1970-01-01
    • 2011-01-24
    • 2021-08-27
    • 1970-01-01
    相关资源
    最近更新 更多