【问题标题】:Algorithm for concurrent access to resource(s) on database并发访问数据库资源的算法
【发布时间】:2014-11-15 09:04:47
【问题描述】:

不久前,我们实施了一个仓库管理应用程序,用于跟踪我们商店中每种产品的数量。我们用数据库锁(select for update)解决了并发访问数据的问题,但是当许多客户端尝试从同一个商店消费产品数量时,这种方法会导致性能不佳。请注意,我们只管理一小部分产品类型(少于 10 个),因此并发程度可能很重(而且,我们不关心库存重新填充)。我们曾考虑将每个资源数量拆分为较小的“桶”,但这种方法可能会导致客户端尝试消耗大于每个桶容量的数量:我们应该管理桶合并等等...... 我的问题是:这个问题有一些被广泛接受的解决方案吗?我也找过学术文章,但主题似乎太宽泛了。

附: 1: 我们的应用程序运行在集群环境中,所以我们不能依赖应用程序的并发控制。 该问题旨在找到一种算法,该算法以不同于单行的方式构造和管理数据,但保留数据库事务(使用或不使用锁)具有的所有优势。

附: 2:为了您的信息,我们管理了大量类似的仓库,示例集中在一个,但我们将所有数据保存在一个数据库中(价格都相同,等等)。

【问题讨论】:

  • 请说明您希望并行执行哪些操作?选择、计算和更新?
  • 主要问题与更新产品数量有关。例如,目前我们有一行产品“A”,数量为 500。客户通过减去一些金额来更新此数量。这种减法的数量级可能从每个客户的少于 10 到 100 或 200 不等;但实际上,这种情况不太常见。我们会寻找一些与数据库锁管理不严格相关但与资源分区/合并相关的算法,以降低单个产品记录的并发程度。显然客户不应该使资源数量为负数
  • 您尝试使用“选择更新”解决什么问题?您使用的是什么 RDBMS?如果您不使用过时的 MySQL,您的 RDBMS 可以为您提供无锁一致读取。
  • 我们使用 Oracle 10g。实际上问题与 DBMS 本身无关。当您开始一个涉及许多表的事务时,无论我们是否愿意,我们都必须确定有关读取的策略。如果我们不锁定行,则在事务期间其他人可能会读取一个将在事务结束时更改的值(脏读),因为在减去之前我们必须读取当前值;此外,由于其他情况(其他产品不可用、未定义价格等),交易可能会失败。
  • 只读事务应该简单地使用 Oracles 快照隔离来获得一致且无锁的读取。写入器确实需要锁定,但每秒每行的写入量应该很少。我是不是误会了什么?

标签: database algorithm concurrency locks


【解决方案1】:

编辑:如果您使用可以在多个进程/服务器之间进行协调的排队程序,则下面的设置仍然可以在集群上工作,例如RabbitMQ.

您还可以使用仅使用数据库的更简单的排队算法,其缺点是需要轮询(而像 RabbitMQ 这样的系统允许线程阻塞,直到消息可用)。创建一个 Requests 表,其中有一列作为主键的唯一 requestIds(例如随机 UUID)、一个 timestamp 列、一个 respourceType 列和一个整数 requestedQuantity 列。您还需要一个日志表,其中包含一个唯一的 requestId 列作为主键、一个 timestamp 列、一个 resourceType 列、一个整数 requestQuantity 列和一个布尔/tinyint/whatever @987654332 @专栏。

当客户端请求一定数量的 ResourceX 时,它会生成一个随机 UUID,并使用 UUID 作为 requestId 在 Requests 表中添加一行,然后轮询 Logs 表中的 requestId。如果success 列为真,则请求成功,否则失败。

拥有数据库的服务器为每个资源分配一个线程或进程,例如ProcessX 负责 ResourceX。 ProcessX从Requests表中检索resourceType = ResourceX所在的所有行,按时间戳排序,然后从Requests中删除;然后它按顺序处理每个请求,为每个成功的请求递减一个内存计数器,并在处理请求结束时更新资源表上的 ResourceX 的数量。然后它将每个请求及其success 状态写入日志表。然后它再次从 requestType = RequestX 的 Requests 中检索所有请求,等等。

使用自动增量整数作为 Requests 主键,并让 ProcessX 按主键而不是时间戳排序可能会更有效。


一种选择是为每个资源分配一个DAOThread - 这个线程是唯一访问该资源的数据库表的线程,因此在数据库级别没有锁定。 Workers(例如 Web 会话)使用并发队列请求资源数量 - 下面的示例使用 Java BlockingQueue,但大多数语言将具有某种可以使用的并发队列实现。

public class Request {
  final int value;
  final BlockingQueue<ReturnMessage> queue;
}

public class ReturnMessage {
  final int value;
  final String resourceType;
  final boolean isSuccess;
}

public class DAOThread implements Runnable {
  private final int MAX_CHANGES = 10;
  private String resourceType;
  private int quantity;
  private int changeCount = 0;
  private DBTable table;
  private BlockingQueue<Request> queue;

  public DAOThread(DBTable table, BlockingQueue<Request> queue) {
    this.table = table;
    this.resourceType = table.select("resource_type");
    this.quantity = table.select("quantity");
    this.queue = queue;
  }

  public void run() {
    while(true) {
      Requester request = queue.take();
      if(request.value <= quantity) {
        quantity -= request.value;
        if(++changeCount > MAX_CHANGES) {
          changeCount = 0;
          table.update("quantity", quantity);
        }
        request.queue.offer(new ReturnMessage(request.value, resourceType, true));
      } else {
        request.queue.offer(new ReturnMessage(request.value, resourceType, false));
      }
    }
  }
}

public class Worker {
   final Map<String, BlockingQueue<Request>> dbMap;
   final SynchronousQueue<ReturnMessage> queue = new SynchronousQueue<>();

   public class WorkerThread(Map<String, BlockingQueue<Request>> dbMap) {
     this.dbMap = dbMap;
   }

   public boolean request(String resourceType, int value) {
     dbMap.get(resourceType).offer(new Request(value, queue));
     return queue.take();
   }
}

Worker 将资源请求发送到相应的 DAOThread 队列; DAOThread 按顺序处理这些请求,如果请求的值不超过数量,则更新本地资源数量并返回成功,否则保持数量不变并返回失败。数据库更新十次后才更新,减少IO量; MAX_CHANGES 越大,从系统故障中恢复的过程就越复杂。您还可以有一个专用的 IOThread 来执行所有数据库写入 - 这样您就不需要复制任何日志记录或计时(例如,应该有一个 Timer 每隔几秒钟将当前数量刷新到数据库)。

Worker 使用一个 SynchronousQueue 来等待来自 DAOThread 的响应(一个 SynchronousQueue 是一个 BlockingQueue 只能容纳一个项目);如果 Worker 在自己的线程中运行,您可能希望将其替换为标准的多项目 BlockingQueue,以便 Worker 可以按任何顺序处理 ReturnMessages。

有一些数据库,例如Riak 具有对计数器的原生支持,因此这可能会提高您的 IO 吞吐量并减少或消除对 MAX_CHANGES 的需求。

您可以通过引入BufferThreads 来缓冲对DAOThreads 的请求,从而进一步提高吞吐量。

public class BufferThread implements Runnable {
  final SynchronousQueue<ReturnMessage> returnQueue = new SynchronousQueue<>();
  final int BUFFERSIZE = 10;

  private DAOThread daoThread;
  private BlockingQueue<Request> queue;
  private ArrayList<Request> buffer = new ArrayList<>(BUFFERSIZE);
  private int tempTotal = 0;

  public BufferThread(DAOThread daoThread, BlockingQueue<Request> queue) {
    this.daoThread = daoThread;
    this.queue = queue;
  }

  public void run() {
    while(true) {
      Request request = queue.poll(100, TimeUnit.MILLISECONDS);
      if(request != null) {
        tempTotal += request.value;
        buffer.add(request);
      }
      if(buffer.size() == BUFFERSIZE || request == null) {
        daoThread.queue.offer(new Request(tempTotal, returnQueue));
        ReturnMessage message = returnQueue.take();
        if(message.isSuccess()) {
          for(Request request: buffer) {
            request.queue.offer(new ReturnMessage(request.value, daoThread.resourceType, message.isSuccess));
          }
        } else {
          // send unbuffered requests to DAOThread to see if any can be satisfied
          for(Request request: buffer) {
            daoThread.queue.offer(request);
          }
        }
        buffer.clear();
        tempTotal = 0;
      }
    }
  }
}

Worker 将他们的请求发送到 BufferThreads,然后等待直到他们缓冲了 BUFFERSIZE 请求或等待 100 毫秒以等待通过缓冲区 (Request request = queue.poll(100, TimeUnit.MILLISECONDS)) 的请求,此时他们转发缓冲的消息给DAOThread。每个DAOThread 可以有多个缓冲区 - 而不是向工作人员发送Map&lt;String, BlockingQueue&lt;Request&gt;&gt;,而是发送Map&lt;String, ArrayList&lt;BlockingQueue&lt;Request&gt;&gt;&gt;,每个BufferThread 一个队列,工作人员使用计数器或随机数生成器来确定哪个BufferThread 发送请求。请注意,如果 BUFFERSIZE 太大和/或如果您有太多 BufferThreads,那么在等待缓冲区填满时,Worker 将遭受较长的暂停时间。

【讨论】:

  • 感谢您的回复。不幸的是,应用程序运行在集群环境中,所以不能采用线程的方式(我们想到了分布式锁机制,但这应该是最后的选择,因为它带来的相关问题)。相反,我们更喜欢在数据库本身上操作数据结构的策略,而不是完全排除锁。关于 Riak 的提示似乎很有趣。但是,我将用这些细节来改进这个问题。再次感谢您!
  • @piero86 我已经编辑了解决集群问题的答案的顶部
猜你喜欢
  • 2012-06-18
  • 1970-01-01
  • 2013-05-31
  • 2018-11-19
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-06-27
  • 2016-05-19
相关资源
最近更新 更多