【问题标题】:PHP concurrency issue, multiple simultaneous requests; mutexes?PHP并发问题,多个同时请求;互斥体?
【发布时间】:2015-08-17 05:32:45
【问题描述】:

所以我刚刚意识到 PHP 可能同时运行多个请求。昨晚的日志似乎显示有两个请求进来,是并行处理的;每个都触发了从另一台服务器导入数据;每个人都试图在数据库中插入一条记录。一个请求在尝试插入另一个线程刚刚插入的记录时失败(导入的数据带有 PK;我没有使用递增的 ID):SQLSTATE[23000]: Integrity constraint violation: 1062 Duplicate entry '865020' for key 'PRIMARY' ...

  1. 我是否正确诊断了这个问题?
  2. 我应该如何解决这个问题?

以下是部分代码。我已经删除了大部分内容(日志记录,从数据中创建患者以外的其他实体),但以下内容应包括相关的 sn-ps。请求命中 import() 方法,该方法本质上为要导入的每条记录调用 importOne()。注意 importOne() 中的 save 方法;这是一个 Eloquent 方法(使用 Laravel 和 Eloquent),它将生成 SQL 以根据需要插入/更新记录。

public function import()
{
        $now = Carbon::now();
        // Get data from the other server in the time range from last import to current import
        $calls = $this->getCalls($this->getLastImport(), $now);
        // For each call to import, insert it into the DB (or update if it already exists)
        foreach ($calls as $call) {
            $this->importOne($call);
        }
        // Update the last import time to now so that the next import uses the correct range
        $this->setLastImport($now);
}

private function importOne($call)
{
    // Get the existing patient for the call, or create a new one
    $patient = Patient::where('id', '=', $call['PatientID'])->first();
    $isNewPatient = $patient === null;
    if ($isNewPatient) {
        $patient = new Patient(array('id' => $call['PatientID']));
    }
    // Set the fields
    $patient->given_name = $call['PatientGivenName'];
    $patient->family_name = $call['PatientFamilyName'];
    // Save; will insert/update appropriately
    $patient->save();
}

我猜该解决方案需要在整个导入块周围使用互斥锁?如果一个请求无法获得互斥锁,它就会继续处理其余的请求。想法?

编辑:请注意,这不是严重故障。异常被捕获并记录下来,然后像往常一样响应请求。并且导入在另一个请求上成功,然后该请求将照常响应。用户并不聪明;他们甚至不知道导入,这不是请求的主要焦点。所以真的,我可以让它保持原样运行,除了偶尔的异常,没有什么不好的事情发生。但是,如果有一个修复程序可以防止完成额外的工作/不必要地向其他服务器发送多个请求,那可能值得追求。

EDIT2:好的,我已经尝试使用flock() 实现锁定机制。想法?下面的工作吗?我将如何对这个添加进行单元测试?

public function import()
{
    try {
        $fp = fopen('/tmp/lock.txt', 'w+');
        if (flock($fp, LOCK_EX)) {
            $now = Carbon::now();
            $calls = $this->getCalls($this->getLastImport(), $now);
            foreach ($calls as $call) {
                $this->importOne($call);
            }
            $this->setLastImport($now);
            flock($fp, LOCK_UN);
            // Log success.
        } else {
            // Could not acquire file lock. Log this.
        }
        fclose($fp);
    } catch (Exception $ex) {
        // Log failure.
    }
}

EDIT3:关于锁的以下替代实现的想法:

public function import()
{
    try {
        if ($this->lock()) {
            $now = Carbon::now();
            $calls = $this->getCalls($this->getLastImport(), $now);
            foreach ($calls as $call) {
                $this->importOne($call);
            }
            $this->setLastImport($now);
            $this->unlock();
            // Log success
        } else {
            // Could not acquire DB lock. Log this.
        }
    } catch (Exception $ex) {
        // Log failure
    }
}

/**
 * Get a DB lock, returns true if successful.
 *
 * @return boolean
 */
public function lock()
{
    return DB::SELECT("SELECT GET_LOCK('lock_name', 1) AS result")[0]->result === 1;
}

/**
 * Release a DB lock, returns true if successful.
 *
 * @return boolean
 */
public function unlock()
{
    return DB::select("SELECT RELEASE_LOCK('lock_name') AS result")[0]->result === 1;
}

【问题讨论】:

  • 我什至没有阅读您的问题内容,​​并给您投了赞成票。感谢上帝,有人问了一个真正的问题,而不仅仅是修复这个错字,如何四舍五入,如何查询数据库!
  • 是的,并发是个问题。有很多方法可以解决这个问题,视情况而定。锁定、乐观锁定、互斥令牌、咨询锁……这一切都取决于给定情况的最佳解决方案。虽然我也对一个严肃的问题感到高​​兴,但我不确定这是否可以在一个答案中合理回答......
  • 您是否尝试过使用 memcache 构建自己的互斥体/信号量?如果只有一台服务器正在写入数据库,它将对您有所帮助。
  • 用flock() 编写了一个类似互斥锁的机制……这看起来合理吗?请参阅 OP 进行编辑。知道我将如何对此进行单元测试吗?如何同时点击两次 import() 方法...?
  • 提醒任何想使用文件锁的人;当您不控制服务器上文件系统的权限时,它会让人头疼。 /s

标签: php concurrency eloquent mutex


【解决方案1】:

您似乎没有竞争条件,因为 ID 来自导入文件,并且如果您的导入算法正常工作,那么每个线程都有自己的工作分片要完成并且应该永远不要与他人发生冲突。现在似乎有 2 个线程正在接收创建同一患者的请求,并且由于算法错误而相互冲突。

确保每个生成的线程都从导入文件中获取一个新行,并且仅在失败时重复。

如果你不能这样做,并且想坚持使用互斥锁,使用文件锁似乎不是一个很好的解决方案,因为现在你解决了应用程序中的冲突,而它实际上发生在你的数据库中。数据库锁也应该快得多,并且总体上是一个更体面的解决方案。

请求一个数据库锁,像这样:

$db -> exec('LOCK TABLES table1 WRITE, table2 WRITE');

当您写入锁定的表时,您可能会遇到 SQL 错误,因此请在您的 Patient->save() 周围加上 try catch。

更好的解决方案是使用条件原子查询。一个数据库查询,其中也有条件。你可以使用这样的查询:

INSERT INTO targetTable(field1) 
SELECT field1
FROM myTable
WHERE NOT(field1 IN (SELECT field1 FROM targetTable))

【讨论】:

  • 多个同时导入是不可取的,我只想要一个,所以不需要实现一些东西来在线程之间共享工作。是什么让您说冲突发生在数据库中?我同意数据库层是异常的来源,但那是因为两个线程正在做他们不应该在应用程序层做的事情。是否可以使用数据库锁来防止它们同时导入?我想我希望他们不要完全阻止写入,因为其他人可能正在通过正常使用(而不是导入)读取/保存。
  • 啊,我认为这是单独的线程,我的错。你确实想要一个写锁,因为你的应用程序正在读取状态,然后根据它之前假设的状态进行处理。在您读取状态和根据状态写入更新之间的时间段内,您不希望状态同时发生变化。确切地说,其他 scipts 同时更改相同数据的用例将使您的脚本相互咬合。另外,请注意,表上的写锁仍然允许读取,只是不允许更新/插入。
  • 我建议你阅读数据库锁定:乐观和悲观锁定(AKA 读写锁定),这篇文章给了一个很好的提示:stackoverflow.com/questions/129329/… 这并不简单,但我相信它是强制性的让每个程序员都意识到能够编写弹性软件。
  • 谢谢,罗伊。您介意看看 OP,我尝试过使用互斥锁。在应用程序级别而不是数据库级别执行意味着我只向另一台服务器发送一个请求,因此可以尽快响应对主要服务的后续“冲突”请求。想法?
  • 锁似乎以正确的方式实现,请注意您不会获得表锁。因此,与数据库通信的任何其他应用程序都不会像使用表锁那样受到限制。数据库的其他客户端可以通过不检查它来“解决”您的命名锁。此外,如果您想在 1 个请求中完成所有操作,则不应在所有这些实体上调用 save 并建立 1 个大查询。现在在我看来,每个 save();会发出查询。此外,当你的 mysql 连接关闭时,你的锁会自动释放。
【解决方案2】:

您的示例代码将阻止第二个请求,直到第一个请求完成。您需要为flock() 使用LOCK_NB 选项来立即返回错误而不是等待。

是的,您可以在文件系统级别或直接在数据库中使用锁定或信号量。

在您需要每个导入文件只处理一次的情况下,最好的解决方案是为每个导入文件创建一个包含行的 SQL 表。在导入开始时,您插入正在导入的信息,因此其他线程将知道不再处理它。导入完成后,您将其标记为这样。 (然后几个小时后,您可以检查表格以查看导入是否真的完成。)

此外,最好在单独的脚本上执行此类一次性持久的操作,例如在单独的脚本上导入,而不是在向访问者提供普通网页时。例如,您可以安排每晚的 cron 作业,该作业将获取导入文件并进行处理。

【讨论】:

  • 导入是为了与另一个数据库保持同步,需要接近实时;因此,不是每晚和每个请求:/ 谢谢,我会调查 LOCK_NB!
  • 实际上,您已提示我研究锁定数据库。使用 get_lock() 和 release_lock() 似乎可行?我稍后会发布我的实现...
  • 对于近实时同步 (MySQL),我建议使用 percona 工具集中的免费工具 pt-table-sync percona.com/doc/percona-toolkit/2.2/pt-table-sync.html 。我每 5 分钟从 cron 使用一次(甚至可以每 1 分钟运行一次)
  • 听起来是个有用的工具!不过,我不是在复制数据;获取数据,重组和添加不同的字段以及删除多余的小助手应用程序等等。这是一个原型作品。我也无法控制其他服务器;我有点明白我得到了什么。不过谢谢!
【解决方案3】:

我看到三个选项:
- 使用互斥量/信号量/其他一些标志 - 不容易编码和维护
- 使用DB内置事务机制
- 使用队列(如 RabbitMQ 或 0MQ)将消息连续写入 DB

【讨论】:

  • 我想我宁愿避免只在数据库级别进行并发检查,因为为时已晚;到那时,我已经访问了另一台服务器以获取要导入的数据两次。我宁愿检查发生得足够早,以防止第二个请求在 import() 方法中做任何工作。所以似乎只有第一个选项可以满足,对吗?
  • 我认为 MQ 是一个不错的选择 - 将您的控制结构放在一个地方,让它决定必须插入什么。
  • 目前,一个名为 Eloquent 的标准框架正在处理所有的数据库交互。也许我不明白你的建议,但似乎很难在队列中加入该框架?而且我仍然觉得检查应该更早发生,这样对其他服务器的第二次请求就不会发生。
  • 这不是一个完整且有用的答案...建议解决方案方向时请使用 cmets,并在您对问题有完整答案/解决方案时提交答案。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-06-17
  • 2021-06-22
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多