【发布时间】:2020-02-28 03:52:57
【问题描述】:
让我们考虑以下代码:
客户端代码:
public class MyClient {
private final MyClientSideService myClientSideService;
public MyClient(MyClientSideService myClientSideService) {
this.myClientSideService = myClientSideService;
}
public String requestRow(Integer req) {
return myClientSideService.requestSingleRow(req);
}
}
客户端服务:
public class MyClientSideService {
private final MyServerSideService myServerSideService;
public MyClientSideService(MyServerSideService myServerSideService) {
this.myServerSideService = myServerSideService;
}
public String requestSingleRow(int req) {
return myServerSideService.requestRowBatch(Arrays.asList(req)).get(0);
}
}
服务器端服务:
@Slf4j
public class MyServerSideService {
//single threaded bottleneck service
public synchronized List<String> requestRowBatch(List<Integer> batchReq) {
log.info("Req for {} started");
try {
Thread.sleep(100);
return batchReq.stream().map(String::valueOf).collect(Collectors.toList());
} catch (InterruptedException e) {
return null;
} finally {
log.info("Req for {} finished");
}
}
}
主要:
@Slf4j
public class MainClass {
public static void main(String[] args) {
MyClient myClient = new MyClient(new MyClientSideService(new MyServerSideService()));
for (int i = 0; i < 20; i++) {
new Thread(() -> {
for (int m = 0; m < 100; m++) {
int k = m;
log.info("Response is {}", myClient.requestRow(k));
}
}).start();
}
}
}
根据日志,它大约需要 4 分 22 秒,但它太多了。我认为它可能会得到显着改善。我想实现隐式批处理。所以MyClientSideService 应该收集请求,当它变成 50(它是预配置的批量大小)或一些预配置的超时到期时,然后请求 MyServerSideService 并将路由结果返回给客户端。协议应该是同步的,所以客户端必须被阻塞直到得到结果。
我尝试使用CountDownLatches 和CyclicBarriers 编写代码,但我的尝试远未成功。
我怎样才能实现我的目标?
附言
如果将requestRowBatch 的返回类型List<String> 替换为Map<Integer, String> 以将请求和响应映射委托给服务器,则以下工作存在限制。仅当我发送 时它才有效
@Slf4j
public class MyClientSideService {
private final Integer batchSize = 25;
private final Integer maxTimeoutMillis = 5000;
private final MyServerSideService myServerSideService;
private final Queue<Integer> queue = new ArrayBlockingQueue(batchSize);
private final Map<Integer, String> responseMap = new ConcurrentHashMap();
private final AtomicBoolean started = new AtomicBoolean();
private CountDownLatch startBatchRequestLatch = new CountDownLatch(batchSize);
private CountDownLatch awaitBatchResponseLatch = new CountDownLatch(1);
public MyClientSideService(MyServerSideService myServerSideService) {
this.myServerSideService = myServerSideService;
}
public String requestSingleRow(int req) {
queue.offer(req);
if (!started.compareAndExchange(false, true)) {
log.info("Start batch collecting");
startBatchCollecting();
}
startBatchRequestLatch.countDown();
try {
log.info("Awaiting batch response latch for {}...", req);
awaitBatchResponseLatch.await();
log.info("Finished awaiting batch response latch for {}...", req);
return responseMap.get(req);
} catch (InterruptedException e) {
e.printStackTrace();
return "EXCEPTION";
}
}
private void startBatchCollecting() {
new Thread(() -> {
try {
log.info("Await startBatchRequestLatch");
startBatchRequestLatch.await(maxTimeoutMillis, TimeUnit.MILLISECONDS);
log.info("await of startBatchRequestLatch finished");
} catch (InterruptedException e) {
e.printStackTrace();
}
responseMap.putAll(requestBatch(queue));
log.info("Releasing batch response latch");
awaitBatchResponseLatch.countDown();
}).start();
}
public Map<Integer, String> requestBatch(Collection<Integer> requestList) {
return myServerSideService.requestRowBatch(requestList);
}
}
更新
根据 Malt 的回答,我能够开发以下内容:
@Slf4j
public class MyClientSideServiceCompletableFuture {
private final Integer batchSize = 25;
private final Integer maxTimeoutMillis = 5000;
private final MyServerSideService myServerSideService;
private final Queue<Pair<Integer, CompletableFuture>> queue = new ArrayBlockingQueue(batchSize);
private final AtomicInteger counter = new AtomicInteger(0);
private final Lock lock = new ReentrantLock();
public MyClientSideServiceCompletableFuture(MyServerSideService myServerSideService) {
this.myServerSideService = myServerSideService;
}
public String requestSingleRow(int req) {
CompletableFuture<String> future = new CompletableFuture<>();
lock.lock();
try {
queue.offer(Pair.of(req, future));
int counter = this.counter.incrementAndGet();
if (counter != 0 && counter % batchSize == 0) {
log.info("request");
List<Integer> requests = queue.stream().map(p -> p.getKey()).collect(Collectors.toList());
Map<Integer, String> serverResponseMap = requestBatch(requests);
queue.forEach(pair -> {
String response = serverResponseMap.get(pair.getKey());
CompletableFuture<String> value = pair.getValue();
value.complete(response);
});
queue.clear();
}
} finally {
lock.unlock();
}
try {
return future.get();
} catch (Exception e) {
return "Exception";
}
}
public Map<Integer, String> requestBatch(Collection<Integer> requestList) {
return myServerSideService.requestRowBatch(requestList);
}
}
但如果 size 不是批大小的倍数,它就不起作用
【问题讨论】:
-
也许可以试试CoalescingBulkloader 之类的东西,它可以让缓存隐式地批量调用。
-
@BenManes 您的意思是根据您链接的源的想法构建我的解决方案,还是您的意思是我应该使用您直接链接的库来构建解决方案?
-
两者都可以。该代码遵循@Malt 提出的类似想法。这是一个贡献的示例,不是库的一部分,因此不直接支持。还有像
Reactor这样的库可能会很有趣。 -
@Ben Manes 坦率地说,我不知道如何将它应用到我的示例中。能否请您提供更多详细信息?
-
@Ben Manes,请查看主题更新
标签: java multithreading concurrency java.util.concurrent batching