【问题标题】:DynamoDb conditional INSERTSDynamoDb 条件插入
【发布时间】:2019-07-22 13:26:09
【问题描述】:

我在我的应用程序中使用 C# 和 DynamoDb。 我的设计假设只写只读。强烈禁止更新项目。仅插入新项目或读取现有项目。
假设我有付款项目的状态

{
  "PaymentInvoice":"001", //PK
  "Status":"2019-07-10T00:00:00#Approved" //SK
}

现在 2 个并发请求来自不同的客户端:第一个尝试 Cancel 付款,第二个尝试 Settle(confirm) 付款。

2 个插入是:

{
  "PaymentInvoice":"001", //PK
  "Status":"2019-07-10T00:01:00#Cancel" //SK
}

{
  "PaymentInvoice":"001", //PK
  "Status":"2019-07-10T00:01:00#Settle" //SK
}

所以这是竞争条件。
- 如果取消,您将无法结算付款
- 如果已经结算,您将无法取消付款

明显的解决办法是:
1) 创建交易
2)先查询,然后根据业务规则检查是否可以插入
3) 插入新项目

所以问题是: 1)是否可以锁定整个分区以防止从其他客户端插入新项目? 2) 是否有任何内置选项,如条件更新,但对于 插入

【问题讨论】:

  • 您的表模式是在生产中,还是在设计阶段?根据您是否可以更改架构,我会有不同的答案。
  • 现在只是一个设计jor,所以PK存储transactionId,sk是Status和datetime的concat
  • @MatthewPope 忘了提及你

标签: c# amazon-dynamodb


【解决方案1】:

在我开始之前快速澄清一下。我假设一个分布式的、面向服务的架构,并且我假设对这个 DynamoDB 表的所有读取和写入都只通过一个服务发生。我将使用“应用程序”来指代您正在构建的访问表的软件,并使用“客户端”来指代任何属于您的应用程序客户端的东西。


是否有任何内置选项,例如条件更新,但用于插入项目?

简短的回答是“是”,使用基于version numbersoptimistic locking

您需要首先将排序键更改为顺序事件编号。这将是用于对您的项目进行版本控制的属性,它使用条件更新,并且在解决您的问题的任何解决方案中产生的额外开销最少。

让我们首先查看建议架构的一些示例数据。我冒昧地添加了更多的状态类型。

invoiceId | eventNo | eventStatus |      datetime
----------|---------|-------------|---------------------
      111 |       0 | created     | 2019-07-11T00:01:00
      111 |       1 | approved    | 2019-07-11T00:02:00
      111 |       2 | modified    | 2019-07-12T00:03:00
      111 |       3 | approved    | 2019-07-12T00:04:00
      111 |       4 | settled     | 2019-07-13T00:05:00

乐观锁定的一般思想是读取当前状态,然后通过插入带有递增 eventNo(相当于 AWS 文档中的 version)的新记录来更新状态,条件是 @ 987654336@ 尚不存在于该invoiceId 中。这样做的原因是,当您读取现有状态时,您总是知道下一个 eventNo 应该是什么(与使用时间戳作为排序键不同)。

为了更具体一点,在2019-07-13 上,当客户端发送结算发票的请求时,您的应用程序会读取最新状态,看到eventNo 为3,status 为“已批准”,所以它向 DynamoDB 提交了一个UpdateItem 请求(翻译成简单的英语)说

仅当invoiceId=111eventNo=4 不存在其他状态更新时,才插入带有invoiceId=111eventNo=4 的状态更新

如果两个客户端同时尝试更新状态,只有一个 UpdateItem 请求会成功,另一个会返回 ConditionalCheckFailedException

好的,我该如何编写代码?

我已经十多年没有使用过 C#,所以请原谅可能存在的任何语法或格式错误。

AmazonDynamoDBClient client = new AmazonDynamoDBClient();

// These should be method parameters/args, but I'm directly assigning them to 
// keep this code sample simple.
var invoiceToUpdate = 123;
var invoiceNewState = "settled";

// Here's the useful part of the sample code

// First we make a query to get the current state
var queryRequest = new QueryRequest
{
    TableName = "Invoices",
    KeyConditionExpression = "invoiceId = :inv",
    ExpressionAttributeValues = new Dictionary<string, AttributeValue> {
        {":inv", new AttributeValue {N = invoiceIdToUpdate.toString() }}
    },

    // This assumes we only need to check the current state and not any of the historical
    // state, so we'll limit the query to return only one result.
    Limit = 1,

    // If we're limiting it to only one result, change the sort order to make sure we get
    // the result with the largest eventNo (and therefore the most recent state).
    ScanIndexForward = false,

    // This is not strictly necessary for correctness because of the condition expression
    // in the PutItem request, but including it will help reduce the likelihood of getting
    // a ConditionalCheckFailedException later on.
    ConsistentRead = true
};

var queryResponse = client.Query(queryRequest);

// Check to see if there is any previous record for this invoice
// Setup the default values if the query returned no results
int newEventNo = 0;
string invoiceCurrentState = null;
if (queryResponse.Items.Count > 0) {{
    // If there is any existing record, then increment the eventNo for the new record
    var latestRecord = queryResponse.QueryResult().Items[0];
    newEventNo = Convert.ToInt32(latestRecord["eventNo"]) + 1;
    invoiceCurrentState = latestRecord["eventStatus"];
}

var isValidChange = MyBusinessLogic.isValidChange(invoiceCurrentState, invoiceNewState);

if (isValidChange) {
    var putItemRequest = new PutItemRequest
    {
        TableName = "Invoices",
        Item = new Dictionary<string,AttributeValue>() { 
            { "invoiceId", new AttributeValue {N = invoiceIdToUpdate.toString() }},
            { "eventNo", new AttributeValue {N = newEventNo.toString()}},
            { "eventStatus", new AttributeValue {S = invoiceNewState}},
            { "datetime", new AttributeValue {S = DateTime.UtcNow.ToString("yyyy-MM-ddTHH:mm:ssZ") }}
        },

        // Every item must have the key attributes, so using 'attribute_not_exists'
        // on a key attribute is functionally equivalent to an "item_not_exists" 
        // condition, causing the PUT to fail if it would overwrite anything at all.
        ConditionExpression = "attribute_not_exists('invoiceId')"
    };

    try {
        var putItemResponse = client.PutItem(putItemRequest);

    } catch (ConditionalCheckFailedException ex) {
        // How you handle this is up to you. I recommend choosing one of these options: 
        // (1) Throw an exception with a more useful message explaining that the state changed while the
        //     request was being processed
        // (2) Automatically try again, starting with the query and including the business validations,
        //     and if the state change is still valid, submit a new PutItem request with the new eventNo.
    }

    // Return an acknowledgement to the client

} else {
    throw new System.InvalidOperationException("Update is not valid for the current status of the invoice.");
}

这是我给出的代码示例的一些相关文档。


是否可以锁定整个分区以防止插入来自其他客户端的新项目?

是的,但它不是在数据库中实现的锁。锁定必须在您的应用程序中使用会产生额外开销的单独锁定库进行,因此除非您没有其他选择,否则不应使用此方法。对于无法更改表架构的任何阅读问题的人,您可以将应用程序设置为使用DynamoDB lock client 锁定分区键,读取当前状态,执行写入(如果允许),然后释放锁。

【讨论】:

  • 很好的解释。这种解决方案的唯一一个问题 - 这个排序键也是 GSI 之一的分区键,所以如果我输入 1、2、3 或其他东西,发电机将把所有事务的所有状态放入该分区。所以这将是一个巨大的分区。有他们这样好吗?除非我通过这个分区查询。如果我然后将 InvoiceNumber+Sequence 之类的排序键组合起来,那可以吗?在应用程序级别,我基本上会提取该序列并增加其数量?
  • 您可以使用任何您想要的 GSI 键。不过,不要更改主表的关键属性。
  • 另外,大分区也不再需要担心了。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-01-10
相关资源
最近更新 更多