【问题标题】:How to add batching implicit for client?如何为客户端添加隐式批处理?
【发布时间】:2020-02-28 03:52:57
【问题描述】:

让我们考虑以下代码:

客户端代码:

public class MyClient {
    private final MyClientSideService myClientSideService;

    public MyClient(MyClientSideService myClientSideService) {
        this.myClientSideService = myClientSideService;
    }

    public String requestRow(Integer req) {
        return myClientSideService.requestSingleRow(req);
    }
}

客户端服务:

public class MyClientSideService {
    private final MyServerSideService myServerSideService;

    public MyClientSideService(MyServerSideService myServerSideService) {
        this.myServerSideService = myServerSideService;
    }

    public String requestSingleRow(int req) {
        return myServerSideService.requestRowBatch(Arrays.asList(req)).get(0);
    }
}

服务器端服务:

@Slf4j
public class MyServerSideService {
    //single threaded bottleneck service
    public synchronized List<String> requestRowBatch(List<Integer> batchReq) {
        log.info("Req for {} started");
        try {
            Thread.sleep(100);
            return batchReq.stream().map(String::valueOf).collect(Collectors.toList());

        } catch (InterruptedException e) {
            return null;
        } finally {
            log.info("Req for {} finished");

        }
    }
}

主要:

@Slf4j
public class MainClass {
    public static void main(String[] args) {
        MyClient myClient = new MyClient(new MyClientSideService(new MyServerSideService()));
        for (int i = 0; i < 20; i++) {
            new Thread(() -> {
                for (int m = 0; m < 100; m++) {
                    int k = m;
                    log.info("Response is {}", myClient.requestRow(k));
                }
            }).start();
        }
    }
}

根据日志,它大约需要 4 分 22 秒,但它太多了。我认为它可能会得到显着改善。我想实现隐式批处理。所以MyClientSideService 应该收集请求,当它变成 50(它是预配置的批量大小)或一些预配置的超时到期时,然后请求 MyServerSideService 并将路由结果返回给客户端。协议应该是同步的,所以客户端必须被阻塞直到得到结果。

我尝试使用CountDownLatches 和CyclicBarriers 编写代码,但我的尝试远未成功。

我怎样才能实现我的目标?

附言

如果将requestRowBatch 的返回类型List&lt;String&gt; 替换为Map&lt;Integer, String&gt; 以将请求和响应映射委托给服务器,则以下工作存在限制。仅当我发送 时它才有效

@Slf4j
public class MyClientSideService {
    private final Integer batchSize = 25;
    private final Integer maxTimeoutMillis = 5000;
    private final MyServerSideService myServerSideService;
    private final Queue<Integer> queue = new ArrayBlockingQueue(batchSize);
    private final Map<Integer, String> responseMap = new ConcurrentHashMap();
    private final AtomicBoolean started = new AtomicBoolean();

    private CountDownLatch startBatchRequestLatch = new CountDownLatch(batchSize);
    private CountDownLatch awaitBatchResponseLatch = new CountDownLatch(1);


    public MyClientSideService(MyServerSideService myServerSideService) {
        this.myServerSideService = myServerSideService;
    }

    public String requestSingleRow(int req) {
        queue.offer(req);
        if (!started.compareAndExchange(false, true)) {
            log.info("Start batch collecting");
            startBatchCollecting();
        }
        startBatchRequestLatch.countDown();
        try {
            log.info("Awaiting batch response latch for {}...", req);
            awaitBatchResponseLatch.await();
            log.info("Finished awaiting batch response latch for {}...", req);
            return responseMap.get(req);
        } catch (InterruptedException e) {
            e.printStackTrace();
            return "EXCEPTION";
        }
    }

    private void startBatchCollecting() {
        new Thread(() -> {
            try {
                log.info("Await startBatchRequestLatch");
                startBatchRequestLatch.await(maxTimeoutMillis, TimeUnit.MILLISECONDS);
                log.info("await of startBatchRequestLatch finished");

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            responseMap.putAll(requestBatch(queue));
            log.info("Releasing batch response latch");
            awaitBatchResponseLatch.countDown();

        }).start();
    }

    public Map<Integer, String> requestBatch(Collection<Integer> requestList) {

        return myServerSideService.requestRowBatch(requestList);
    }
}

更新

根据 Malt 的回答,我能够开发以下内容:

@Slf4j
public class MyClientSideServiceCompletableFuture {
    private final Integer batchSize = 25;
    private final Integer maxTimeoutMillis = 5000;
    private final MyServerSideService myServerSideService;
    private final Queue<Pair<Integer, CompletableFuture>> queue = new ArrayBlockingQueue(batchSize);
    private final AtomicInteger counter = new AtomicInteger(0);
    private final Lock lock = new ReentrantLock();

    public MyClientSideServiceCompletableFuture(MyServerSideService myServerSideService) {
        this.myServerSideService = myServerSideService;
    }

    public String requestSingleRow(int req) {
        CompletableFuture<String> future = new CompletableFuture<>();
        lock.lock();
        try {
            queue.offer(Pair.of(req, future));
            int counter = this.counter.incrementAndGet();
            if (counter != 0 && counter % batchSize == 0) {
                log.info("request");
                List<Integer> requests = queue.stream().map(p -> p.getKey()).collect(Collectors.toList());
                Map<Integer, String> serverResponseMap = requestBatch(requests);
                queue.forEach(pair -> {
                    String response = serverResponseMap.get(pair.getKey());
                    CompletableFuture<String> value = pair.getValue();
                    value.complete(response);
                });
                queue.clear();
            }
        } finally {
            lock.unlock();
        }
        try {
            return future.get();
        } catch (Exception e) {
            return "Exception";
        }
    }


    public Map<Integer, String> requestBatch(Collection<Integer> requestList) {

        return myServerSideService.requestRowBatch(requestList);
    }
}

但如果 size 不是批大小的倍数,它就不起作用

【问题讨论】:

  • 也许可以试试CoalescingBulkloader 之类的东西,它可以让缓存隐式地批量调用。
  • @BenManes 您的意思是根据您链接的源的想法构建我的解决方案,还是您的意思是我应该使用您直接链接的库来构建解决方案?
  • 两者都可以。该代码遵循@Malt 提出的类似想法。这是一个贡献的示例,不是库的一部分,因此不直接支持。还有像Reactor 这样的库可能会很有趣。
  • @Ben Manes 坦率地说,我不知道如何将它应用到我的示例中。能否请您提供更多详细信息?
  • @Ben Manes,请查看主题更新

标签: java multithreading concurrency java.util.concurrent batching


【解决方案1】:

如果将List&lt;String&gt; 的requestRowBatch 返回类型替换为Map&lt;Integer, String&gt; 以将请求和响应映射委托给服务器,我可以创建以下解决方案:

@Slf4j
public class MyClientSideServiceCompletableFuture {
    private final Integer batchSize = 25;
    private final Integer timeoutMillis = 5000;
    private final MyServerSideService myServerSideService;
    private final BlockingQueue<Pair<Integer, CompletableFuture>> queue = new LinkedBlockingQueue<>();

    private final Lock lock = new ReentrantLock();
    private final Condition requestAddedCondition = lock.newCondition();


    public MyClientSideServiceCompletableFuture(MyServerSideService myServerSideService) {
        this.myServerSideService = myServerSideService;
        startQueueDrainer();
    }

    public String requestSingleRow(int req) {
        CompletableFuture<String> future = new CompletableFuture<>();
        while (!queue.offer(Pair.of(req, future))) {
            log.error("Can't add {} to the queue. Retrying...", req);
        }
        lock.lock();
        try {
            requestAddedCondition.signal();
        } finally {
            lock.unlock();
        }
        try {
            return future.get();
        } catch (Exception e) {
            return "Exception";
        }
    }

    private void startQueueDrainer() {
        new Thread(() -> {
            log.info("request");
            while (true) {
                ArrayList<Pair<Integer, CompletableFuture>> requests = new ArrayList<>();
                if (queue.drainTo(requests, batchSize) > 0) {
                    log.info("drained {} items", requests.size());
                    Map<Integer, String> serverResponseMap = requestBatch(requests.stream().map(Pair::getKey).collect(Collectors.toList()));
                    requests.forEach(pair -> {
                        String response = serverResponseMap.get(pair.getKey());
                        CompletableFuture<String> value = pair.getValue();
                        value.complete(response);
                    });
                } else {
                    lock.lock();
                    try {
                        while (queue.size() == 0) {
                            try {
                                log.info("Waiting on condition");
                                requestAddedCondition.await(timeoutMillis, TimeUnit.MILLISECONDS);
                                log.info("Waking up on condition");
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    } finally {
                        lock.unlock();
                    }
                }

            }
        }).start();
    }


    public Map<Integer, String> requestBatch(Collection<Integer> requestList) {
        return myServerSideService.requestRowBatch(requestList);
    }
}

它看起来像是一个可行的解决方案。但我不确定它是否是最优的。

【讨论】:

    【解决方案2】:

    您的 MyClientSideServiceCompletableFuture 解决方案将在您每次向队列中添加内容时将请求发送到服务器,并且不会等待请求批量调整。您正在使用 BlockingQueue 并添加不必要的阻塞条件和锁。 BlockingQueue 具有阻塞超时功能,因此不需要添加条件。

    您可以像这样简化您的解决方案:

    只有当批次已满或超时且批次不为空时才会向服务器发送请求。

    private void startQueueDrainer() {
    
            new Thread(() -> {
                log.info("request");
                ArrayList<Pair<Integer, CompletableFuture>> batch = new ArrayList<>(batchSize);
                while (true) {
                    try {
                        batch.clear(); //clear batch
                        long timeTowWait = timeoutMillis;
                        long startTime = System.currentTimeMillis();
    
                        while (timeTowWait > 0 && batch.size() < batchSize) {
                            Pair<Integer, CompletableFuture> request = queue.poll(timeTowWait , TimeUnit.MILLISECONDS);
                            if(request != null){
                              batch.add(request);
                            }
                            long timeSpent = (System.currentTimeMillis() - startTime);
                            timeTowWait = timeTowWait - timeSpent;
                        }
    
                        if (!batch.isEmpty()) {
                            // we wait at least timeoutMillis or batch is full
                            log.info("send {} requests to server", batch.size());
                            Map<Integer, String> serverResponseMap = requestBatch(batch.stream().map(Pair::getKey).collect(Collectors.toList()));
                            batch.forEach(pair -> {
                                String response = serverResponseMap.get(pair.getKey());
                                CompletableFuture<String> value = pair.getValue();
                                value.complete(response);
                            });
                        } else {
                            log.info("We wait {} but the batch is still empty", System.currentTimeMillis() - startTime);
                        }
    
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
    

    将方法 requestSingleRow 更改为不使用锁

      public String requestSingleRow(int req) {
            CompletableFuture<String> future = new CompletableFuture<>();
    
            while (!queue.offer(Pair.of(req, future))) {
                log.error("Can't add {} to the queue. Retrying...", req);
            }
    
            try {
                return future.get();
            } catch (Exception e) {
                return "Exception";
            }
        }
    

    【讨论】:

      【解决方案3】:

      你可以使用CompletableFuture
      让调用MyClientSideService 的线程将它们的请求放入Queue(可能是BlockingQueue,并得到一个新的CompletableFuture 作为回报。调用线程可以调用CompletableFuture.get() 来阻塞直到结果准备好,或者继续做其他的东西。

      CompletableFuture 将与请求一起存储在MyClientSideService 中。当您达到 50 个请求(因此达到 50 个CompletableFuture 实例)时,让客户端服务发送批处理请求。

      请求完成后,使用队列中每个ComplatableFuture实例的CompletableFuture.complete(value)方法通知客户端线程响应已准备好。如果客户端调用了像CompletableFuture.get() 这样的阻塞方法,这将解除对客户端的阻塞,或者如果稍后调用它会立即返回值。

      【讨论】:

      • 在这种情况下,如果我理解你正确的话,MyClient 应该期待 Future[String] 而不是 String。此外,如果我提交 99 个任务,我看不到如何执行队列尾部,那么最后 49 个任务根本不会执行
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2018-12-10
      • 2021-10-26
      • 2015-02-09
      • 1970-01-01
      • 2010-10-27
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多