编辑:如果您使用可以在多个进程/服务器之间进行协调的排队程序,则下面的设置仍然可以在集群上工作,例如RabbitMQ.
您还可以使用仅使用数据库的更简单的排队算法,其缺点是需要轮询(而像 RabbitMQ 这样的系统允许线程阻塞,直到消息可用)。创建一个 Requests 表,其中有一列作为主键的唯一 requestIds(例如随机 UUID)、一个 timestamp 列、一个 respourceType 列和一个整数 requestedQuantity 列。您还需要一个日志表,其中包含一个唯一的 requestId 列作为主键、一个 timestamp 列、一个 resourceType 列、一个整数 requestQuantity 列和一个布尔/tinyint/whatever @987654332 @专栏。
当客户端请求一定数量的 ResourceX 时,它会生成一个随机 UUID,并使用 UUID 作为 requestId 在 Requests 表中添加一行,然后轮询 Logs 表中的 requestId。如果success 列为真,则请求成功,否则失败。
拥有数据库的服务器为每个资源分配一个线程或进程,例如ProcessX 负责 ResourceX。 ProcessX从Requests表中检索resourceType = ResourceX所在的所有行,按时间戳排序,然后从Requests中删除;然后它按顺序处理每个请求,为每个成功的请求递减一个内存计数器,并在处理请求结束时更新资源表上的 ResourceX 的数量。然后它将每个请求及其success 状态写入日志表。然后它再次从 requestType = RequestX 的 Requests 中检索所有请求,等等。
使用自动增量整数作为 Requests 主键,并让 ProcessX 按主键而不是时间戳排序可能会更有效。
一种选择是为每个资源分配一个DAOThread - 这个线程是唯一访问该资源的数据库表的线程,因此在数据库级别没有锁定。 Workers(例如 Web 会话)使用并发队列请求资源数量 - 下面的示例使用 Java BlockingQueue,但大多数语言将具有某种可以使用的并发队列实现。
public class Request {
final int value;
final BlockingQueue<ReturnMessage> queue;
}
public class ReturnMessage {
final int value;
final String resourceType;
final boolean isSuccess;
}
public class DAOThread implements Runnable {
private final int MAX_CHANGES = 10;
private String resourceType;
private int quantity;
private int changeCount = 0;
private DBTable table;
private BlockingQueue<Request> queue;
public DAOThread(DBTable table, BlockingQueue<Request> queue) {
this.table = table;
this.resourceType = table.select("resource_type");
this.quantity = table.select("quantity");
this.queue = queue;
}
public void run() {
while(true) {
Requester request = queue.take();
if(request.value <= quantity) {
quantity -= request.value;
if(++changeCount > MAX_CHANGES) {
changeCount = 0;
table.update("quantity", quantity);
}
request.queue.offer(new ReturnMessage(request.value, resourceType, true));
} else {
request.queue.offer(new ReturnMessage(request.value, resourceType, false));
}
}
}
}
public class Worker {
final Map<String, BlockingQueue<Request>> dbMap;
final SynchronousQueue<ReturnMessage> queue = new SynchronousQueue<>();
public class WorkerThread(Map<String, BlockingQueue<Request>> dbMap) {
this.dbMap = dbMap;
}
public boolean request(String resourceType, int value) {
dbMap.get(resourceType).offer(new Request(value, queue));
return queue.take();
}
}
Worker 将资源请求发送到相应的 DAOThread 队列; DAOThread 按顺序处理这些请求,如果请求的值不超过数量,则更新本地资源数量并返回成功,否则保持数量不变并返回失败。数据库更新十次后才更新,减少IO量; MAX_CHANGES 越大,从系统故障中恢复的过程就越复杂。您还可以有一个专用的 IOThread 来执行所有数据库写入 - 这样您就不需要复制任何日志记录或计时(例如,应该有一个 Timer 每隔几秒钟将当前数量刷新到数据库)。
Worker 使用一个 SynchronousQueue 来等待来自 DAOThread 的响应(一个 SynchronousQueue 是一个 BlockingQueue 只能容纳一个项目);如果 Worker 在自己的线程中运行,您可能希望将其替换为标准的多项目 BlockingQueue,以便 Worker 可以按任何顺序处理 ReturnMessages。
有一些数据库,例如Riak 具有对计数器的原生支持,因此这可能会提高您的 IO 吞吐量并减少或消除对 MAX_CHANGES 的需求。
您可以通过引入BufferThreads 来缓冲对DAOThreads 的请求,从而进一步提高吞吐量。
public class BufferThread implements Runnable {
final SynchronousQueue<ReturnMessage> returnQueue = new SynchronousQueue<>();
final int BUFFERSIZE = 10;
private DAOThread daoThread;
private BlockingQueue<Request> queue;
private ArrayList<Request> buffer = new ArrayList<>(BUFFERSIZE);
private int tempTotal = 0;
public BufferThread(DAOThread daoThread, BlockingQueue<Request> queue) {
this.daoThread = daoThread;
this.queue = queue;
}
public void run() {
while(true) {
Request request = queue.poll(100, TimeUnit.MILLISECONDS);
if(request != null) {
tempTotal += request.value;
buffer.add(request);
}
if(buffer.size() == BUFFERSIZE || request == null) {
daoThread.queue.offer(new Request(tempTotal, returnQueue));
ReturnMessage message = returnQueue.take();
if(message.isSuccess()) {
for(Request request: buffer) {
request.queue.offer(new ReturnMessage(request.value, daoThread.resourceType, message.isSuccess));
}
} else {
// send unbuffered requests to DAOThread to see if any can be satisfied
for(Request request: buffer) {
daoThread.queue.offer(request);
}
}
buffer.clear();
tempTotal = 0;
}
}
}
}
Worker 将他们的请求发送到 BufferThreads,然后等待直到他们缓冲了 BUFFERSIZE 请求或等待 100 毫秒以等待通过缓冲区 (Request request = queue.poll(100, TimeUnit.MILLISECONDS)) 的请求,此时他们转发缓冲的消息给DAOThread。每个DAOThread 可以有多个缓冲区 - 而不是向工作人员发送Map<String, BlockingQueue<Request>>,而是发送Map<String, ArrayList<BlockingQueue<Request>>>,每个BufferThread 一个队列,工作人员使用计数器或随机数生成器来确定哪个BufferThread 发送请求。请注意,如果 BUFFERSIZE 太大和/或如果您有太多 BufferThreads,那么在等待缓冲区填满时,Worker 将遭受较长的暂停时间。