【问题标题】:Consistency in postgresql with locking and select for updatepostgresql 中的一致性与锁定和选择更新
【发布时间】:2011-06-27 18:56:25
【问题描述】:

我有一个可以支持一定数量的并发操作的应用程序。这由 postgres 中的“槽”表表示。当节点上线时,它们会在表中插入一些行,每个槽一个。当作业占用插槽时,它们会更新表中占用其中一个插槽的行,并在完成时再次释放它。

插槽表如下所示:

CREATE TABLE slots (
    id INT8 PRIMARY KEY DEFAULT nextval('slots_seq'),
    node_name TEXT NOT NULL,
    job_name TEXT
);

在任何时候,它都有一些半固定数量的行,每行可能填写也可能不填写 job_name。

当一个新作业想要启动时,它会运行这些查询来获取它应该运行的节点的名称:

BEGIN;
LOCK TABLE slots IN ACCESS EXCLUSIVE MODE;
SELECT id, node_name
    FROM slots
    WHERE job_name IS NULL
    LIMIT 1
    FOR UPDATE;

(从游标中读出node_name和id)

UPDATE slots
    SET job_name = %(job_name)s
    WHERE id = %(slot_id)s;
COMMIT;

这通常能够在不丢失任何更新的情况下声明行,但具有更高级别的并发性,在执行许多 SELECT ... FOR UPDATE 和 UPDATE 查询时只会声明几行。最终结果是,我们最终运行的作业远远多于它们的插槽。

我是否犯了锁定错误?有没有更好的方法来解决这个问题?不使用表锁的东西?

事务级别 SERIALIZABLE 并没有削减它,只有少数行被填充。

我使用的是 postgresql 8.4 版。

【问题讨论】:

  • 好吧,我会在更新之前粘贴一个调试选择/通知组合,以显示有多少行与您的 job_name = %(job_name)s 限制匹配(您使用的任何语言都会自动执行 '引用' %(foo)s 语法?)在其他新闻中,我会在保留步骤期间进行安全检查,以检查 job_name 是否已经保留了一个插槽。
  • 我会试一试。谢谢,赛斯。 (是的,%(variable)s 东西是由我正在使用的 python 接口填充的。)
  • 我希望您能够处理这样一个事实,即您的第一个限制为 1 的请求可以成功,而无需返回任何可用的插槽。您使用了 2 个不同的表名 'slots' 和 'server_slots',错了吗?还有为什么不释放的时候复用slot_id,如果2个job同名会释放太多slot呢?
  • regilero:我确实会处理没有可用插槽、请求代码获取并返回错误的情况。我刚刚修复了 slot/server_slots 问题。作业名称是全局唯一的。实际上是 UUID。
  • 我刚刚编辑以简化问题。进一步的实验表明,我可以在不释放任何插槽的情况下删除更新。

标签: python sql postgresql


【解决方案1】:
BEGIN; 
LOCK TABLE slots IN ACCESS EXCLUSIVE MODE; 
UPDATE slots SET job_name = '111' WHERE id IN (SELECT id FROM slots WHERE job_name IS NULL LIMIT 1) RETURNING *;
COMMIT;

这似乎适用于已提交读。它只是sql(与您的代码相同),可以一次调用(更快)执行。

@Seth Robertson:没有 LOCK TABLE 和没有 while 循环是不安全的。

如果同时有事务A和事务B:A选择第一行,B选择第一行。 A 将锁定并更新行,B 必须等到 A 提交。然后 B 将重新检查条件 job_name IS NULL。为假,B 不会更新 - B 不会选择下一行,只会重新检查并返回空结果。

@joegester:SELECT FOR UPDATE 不是问题,因为所有表都已锁定。

也许还有另一种方法可以完成工作 - 如果您删除并插入行(在其他表中?)而不是设置 NULL。但我不确定如何。

【讨论】:

  • 另一个好主意。我实际上曾经这样做过,但现在我不记得为什么将其更改为 SELECT FOR UPDATE 版本。它可能不仅仅是更快,如果 SELECT FOR UPDATE 是问题,也许这会绕过这个问题。
  • 你能解释一下为什么它不安全吗?或者您是说,如果没有空闲插槽,我放入的 while 循环不存在,它将丢失更新。这是肯定的,即使你锁定了整张桌子。
  • 我不推荐 ACCESS EXCLUSIVE 模式。这将导致您的应用程序被 pg_dump 阻止(并且您正在备份数据库,对吗?)。我建议改用 EXCLUSIVE 模式。见stackoverflow.com/questions/6507475/…
【解决方案2】:

好吧,我用 perl 编写了一个程序来模拟正在发生的事情,因为我认为你所说的不可能。事实上,在运行我的模拟之后,即使我关闭了锁定,我也没有遇到任何问题(因为 SELECT … FOR UPDATEUPDATE 应该进行必要的锁定)。

我在 PG 8.3 和 PG 9.0 上运行了它,它在两个位置都运行良好。

我敦促您尝试该程序和/或尝试一个 python 版本,以获得一个很好的紧凑测试用例,您可以与全班分享。如果它确实有效,您可以调查其中的差异,如果它无效,您可以使用其他人可以玩的东西。

#!/usr/bin/perl
use DBI;
$numchild = 0;
$SIG{CHLD} = sub { if (wait) {$numchild--;} };

sub worker($)
{
  my ($i) = @_;
  my ($job);

  my $dbh = DBI->connect("dbi:Pg:host=localhost",undef,undef,{'RaiseError'=>0, 'AutoCommit'=>0});

  my ($x) = 0;
  while(++$x)
  {
#    $dbh->do("lock table slots in access exclusive mode;") || die "Cannot lock at $i\n";
    my @id = $dbh->selectrow_array("select id from slots where job_name is NULL LIMIT 1 FOR UPDATE;");

    if ($#id < 0)
    {
      $dbh->rollback;
      sleep(.5);
      next;
    }
    $job = "$$-$i-($x)";
    $dbh->do("update slots set job_name='$job' where id=$id[0];") || die "Cannot update at $i\n";
    $dbh->commit || die "Cannot commit\n";
    last;
  }
  if (!$job)
  {
    print STDERR "Could not find slots in 5 attempts for $i $$\n" if ($ENV{'verbose'});
    return;
  }
  else
  {
    print STDERR "Got $job\n" if ($ENV{'verbose'} > 1);
  }
  sleep(rand(5));

#  $dbh->do("lock table slots in access exclusive mode;") || die "Cannot lock at $i\n";
  $dbh->do("update slots set usage=usage+1, job_name = NULL where job_name='$job';") || die "Cannot unlock $job";
  print STDERR "Unlocked $job\n" if ($ENV{'verbose'} > 2);
  $dbh->commit || die "Cannot commit";
}

my $dbh = DBI->connect("dbi:Pg:host=localhost",undef,undef,{'RaiseError'=>0, 'AutoCommit'=>0});

$dbh->do("drop table slots;");
$dbh->commit;
$dbh->do("create table slots (id serial primary key, job_name text, usage int);") || die "Cannot create\n";
$dbh->do("insert into slots values (DEFAULT,NULL,0), (DEFAULT,NULL,0), (DEFAULT,NULL,0), (DEFAULT,NULL,0), (DEFAULT,NULL,0), (DEFAULT,NULL,0), (DEFAULT,NULL,0), (DEFAULT,NULL,0), (DEFAULT,NULL,0), (DEFAULT,NULL,0);") || die "Cannot insert";
$dbh->commit;

for(my $i=0;$i<200;$i++)
{
  if (!fork)
  {
    worker($i);
    exit(0);
  }

  if (++$numchild > 50)
  {
    sleep(1);
  }
}
while (wait > 0)
{
  $numchild--;
  print "Waiting numchild $numchild\n";
  sleep(1);
}
my $dbh = DBI->connect("dbi:Pg:host=localhost",undef,undef,{'RaiseError'=>0, 'AutoCommit'=>0});
my $slots = $dbh->selectall_arrayref("select * from slots;") || die "Cannot do final select";
my $sum=0;
foreach my $slot (@$slots)
{
  printf("%02d %3d %s\n",$slot->[0], $slot->[2], $slot->[1]);
  $sum += $slot->[2];
}
print "Successfully made $sum entries\n";

【讨论】:

  • 好主意。明天我将制作一个从我们的应用程序中提取出来的更简单的版本。我担心the row locking documentation 的第一段说第二个事务可以选择已经锁定但尚未更新的行,然后在第一个事务完成后覆盖更新。另一方面,您的示例准确地显示了我在开始这条道路之前认为它是如何工作的。我希望你是对的。感谢您为此付出所有努力!
  • @joegester:我阅读了文档,它说的是另一个事务可以读取该行,但这与能够选择它有很大不同FOR UPDATE。如果您FOR UPDATE,则保证WHERE 子句为真,直到您的查询更新这些行或您完成事务。
  • 谢谢,赛斯!我从根本上误解了 SELECT FOR UPDATE 的工作原理。
  • 脚本实际上应该返回什么?
  • @Christian Schmitt:。已经很长时间了,但看起来最后成功打印的应该是 200。表转储将是半随机的。希望分布均匀
【解决方案3】:

你可能想看看advisorylocks

尚未测试,但可以像这样重写您的锁定查询:

BEGIN;
SELECT id, node_name
    FROM slots
    WHERE job_name IS NULL
    AND pg_try_advisory_lock('slots'::regclass::int, id::int)
    LIMIT 1;

或者,由于您首先使用的是 bigint(您需要那么多 id?!?),类似于:

BEGIN;
SELECT id, node_name
    FROM slots
    WHERE job_name IS NULL
    AND pg_try_advisory_lock(hashtext('slots_' || id))
    LIMIT 1;

如果您这样做,请注意陷阱 - 无论事务是否成功,都需要每个会话显式解锁咨询锁。

hashtext() 也有发生碰撞的风险,但如果您正在处理作业,这对您来说没什么大不了的......

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2013-03-22
    • 2014-05-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多