【问题标题】:Appropriate way to batch method invocations?批处理方法调用的适当方法?
【发布时间】:2016-01-04 12:13:32
【问题描述】:

假设如下:

public boolean doThisThing(SomeArg arg) {
    if(iAmAllowedToDoIt()) {
        doThing(arg);
        return true;
    } else {
        return false;
    }

假设 iAmAllowedToDoIt() 是一个非常昂贵的方法,并且 doThisThing() 被许多线程同时调用,并且我可以做任何事情,因为我可以做任何事情,有没有办法批量调用 iAmAllowedToDoIt () 这样我会在并发数据结构中累积 SomeArgs,并在解决 iAmAllowedToDoIt 一次不修改 API 后立即在所有这些上调用 doThing?那个代码会是什么样子?我无法弄清楚如何像这样在不修改 API 的情况下高效地执行多线程批处理。一个理想的答案将包括不依赖于在固定时间段内阻塞来累积 doThisThing() 调用的东西。

理想情况下,它最终会是这样的:

  1. 调用 doThisThing
  2. 异步调用 iAmAllowedToDoIt
  3. 在 (2) 之前对 doThisThing 的所有调用都会重新调用阻塞,直到 (2) 返回
  4. (2) 返回,如果为 true,则为所有阻塞的 doThisThing()s 调用 doThing

【问题讨论】:

  • 你的意思是像缓存iAmAllowedToDoIt()的结果?
  • @RealSkeptic 不,缓存结果是不够的,因为缓存必须在未来某个未知时间失效,此时 iAmAllowedToDoIt() 将返回 false,如果不调用它就无法确定。
  • 请不要尝试在 cmets 中放入多行信息或代码。将其添加到问题中。
  • allow方法返回true一次后,什么时候可以再次返回false?
  • 它可以随时返回 false 或 true,只是在给定 iAmAllowedToDoIt() 调用期间发生的所有对 doThisThing() 的调用共享 iAmAllowedToDoIt() 的结果

标签: java performance optimization


【解决方案1】:

您的包含对象可能有一个AtomicReference,其中包含一个CompleteableFuture,用于计算iAmAllowedToDoIt()。对doThisThing() 的其他调用只是等待可完成未来的完成(如果存在),否则创建一个新的,使用适当的 CAS 循环来避免一次创建多个实例。

完成后,引用再次设置为 null,以便稍后调用该方法的线程可以开始新的计算。

【讨论】:

    【解决方案2】:

    您可以执行以下操作(实现类似于@the8472 提出的算法):

    public class Test {
    
        /**
         * Lock used to guard accesses to allowedFuture
         */
        private final Object lock = new Object();
    
        /**
         * The future result being computed, reset to null as soon as the result is known
         */
        private FutureTask<Boolean> allowedFuture = null;
    
        private static final Random RANDOM = new Random();
    
        public boolean doThisThing() throws ExecutionException, InterruptedException {
            if (iAmAllowedToDoIt()) {
                System.out.println("doing it...");
                return true;
            }
            else {
                System.out.println("not doing it...");
                return false;
            }
        }
    
        private boolean iAmAllowedToDoIt() throws ExecutionException, InterruptedException {
            // if true, this means that this thread is the one which must really compute if I am allowed
            boolean mustCompute = false;
    
            // The Future holding the result which is either the cached one, or a new one stored in the cache
            FutureTask<Boolean> result;
    
            synchronized (lock) {
                // if no one has computed the result yet, or if it has been computed and thus must be recomputed
                // then create it
                if (this.allowedFuture == null) {
                    mustCompute = true;
                    this.allowedFuture = new FutureTask<>(new Callable<Boolean>() {
                        @Override
                        public Boolean call() throws Exception {
                            System.out.println("computing if I am allowed...");
                            Thread.sleep(RANDOM.nextInt(3000));
                            boolean allowed = RANDOM.nextBoolean();
                            System.out.println(allowed ? "allowed!" : "not allowed!");
                            return allowed;
                        }
                    });
                }
                result = this.allowedFuture;
            }
            if (mustCompute) {
                allowedFuture.run();
    
                // reset the cache to null, so that the next thread recomputes the result
                synchronized (lock) {
                    this.allowedFuture = null;
                }
            }
            return result.get();
        }
    
        public static void main(String[] args) {
            Test test = new Test();
            Runnable r = new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(RANDOM.nextInt(6000));
                        test.doThisThing();
                    }
                    catch (ExecutionException | InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
            };
            for (int i = 0; i < 50; i++) {
                Thread t = new Thread(r);
                t.start();
            }
        }
    }
    

    【讨论】:

    • 啊,很有道理,我以前不知道未来的任务。
    • 有一个竞争条件不会影响设置 mustCompute 的正确性 - 从理论上讲,您可以对从未进入 if(mustCompute) 块的线程进行实时锁定以运行未来,但这不太可能。不过谢谢你! =)
    猜你喜欢
    • 2013-03-11
    • 2013-06-25
    • 2016-08-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-03-22
    • 1970-01-01
    • 2015-02-28
    相关资源
    最近更新 更多