提供一个具体的例子(因为在线上令人惊讶地缺乏正确的例子):这里是如何在 CouchDB 中实现“atomic bank balance transfer”(大部分复制自我关于同一主题的博客文章:http://blog.codekills.net/2014/03/13/atomic-bank-balance-transfer-with-couchdb/)
首先,简要回顾一下这个问题:银行系统如何能够允许
要在账户之间转移的钱被设计成没有种族
可能留下无效或无意义的余额的条件?
这个问题有几个部分:
首先:事务日志。而不是将帐户的余额存储在一个单一的
记录或文件 — {"account": "Dave", "balance": 100} — 帐户的
余额是通过汇总该帐户的所有贷方和借方来计算的。
这些贷方和借方存储在交易日志中,可能看起来
像这样:
{"from": "Dave", "to": "Alex", "amount": 50}
{"from": "Alex", "to": "Jane", "amount": 25}
计算余额的 CouchDB map-reduce 函数看起来
像这样:
POST /transactions/balances
{
"map": function(txn) {
emit(txn.from, txn.amount * -1);
emit(txn.to, txn.amount);
},
"reduce": function(keys, values) {
return sum(values);
}
}
为完整起见,以下是余额列表:
GET /transactions/balances
{
"rows": [
{
"key" : "Alex",
"value" : 25
},
{
"key" : "Dave",
"value" : -50
},
{
"key" : "Jane",
"value" : 25
}
],
...
}
但这留下了一个明显的问题:如何处理错误?如果发生什么
有人试图进行大于余额的转账?
使用 CouchDB(和类似的数据库)这种业务逻辑和错误
处理必须在应用程序级别实现。天真地,这样的功能
可能看起来像这样:
def transfer(from_acct, to_acct, amount):
txn_id = db.post("transactions", {"from": from_acct, "to": to_acct, "amount": amount})
if db.get("transactions/balances") < 0:
db.delete("transactions/" + txn_id)
raise InsufficientFunds()
但请注意,如果应用程序在插入事务之间崩溃
并检查更新后的余额,数据库将处于不一致的状态
state:发件人可能会留下负余额,而收件人
以前不存在的钱:
// Initial balances: Alex: 25, Jane: 25
db.post("transactions", {"from": "Alex", "To": "Jane", "amount": 50}
// Current balances: Alex: -25, Jane: 75
如何解决这个问题?
为了确保系统永远不会处于不一致状态,两个部分
每笔交易都需要添加信息:
创建事务的时间(以确保有strict
total ordering 个事务),以及
状态——交易是否成功。
还需要有两个视图——一个返回一个可用的帐户
余额(即所有“成功”交易的总和),另一个
返回最旧的“待处理”交易:
POST /transactions/balance-available
{
"map": function(txn) {
if (txn.status == "successful") {
emit(txn.from, txn.amount * -1);
emit(txn.to, txn.amount);
}
},
"reduce": function(keys, values) {
return sum(values);
}
}
POST /transactions/oldest-pending
{
"map": function(txn) {
if (txn.status == "pending") {
emit(txn._id, txn);
}
},
"reduce": function(keys, values) {
var oldest = values[0];
values.forEach(function(txn) {
if (txn.timestamp < oldest) {
oldest = txn;
}
});
return oldest;
}
}
转帐列表现在可能如下所示:
{"from": "Alex", "to": "Dave", "amount": 100, "timestamp": 50, "status": "successful"}
{"from": "Dave", "to": "Jane", "amount": 200, "timestamp": 60, "status": "pending"}
接下来,应用程序需要有一个可以解析的函数
通过检查每个待处理的交易以验证它是
有效,然后将其状态从“待定”更新为“成功”或
“拒绝”:
def resolve_transactions(target_timestamp):
""" Resolves all transactions up to and including the transaction
with timestamp `target_timestamp`. """
while True:
# Get the oldest transaction which is still pending
txn = db.get("transactions/oldest-pending")
if txn.timestamp > target_timestamp:
# Stop once all of the transactions up until the one we're
# interested in have been resolved.
break
# Then check to see if that transaction is valid
if db.get("transactions/available-balance", id=txn.from) >= txn.amount:
status = "successful"
else:
status = "rejected"
# Then update the status of that transaction. Note that CouchDB
# will check the "_rev" field, only performing the update if the
# transaction hasn't already been updated.
txn.status = status
couch.put(txn)
最后,正确执行传输的应用代码:
def transfer(from_acct, to_acct, amount):
timestamp = time.time()
txn = db.post("transactions", {
"from": from_acct,
"to": to_acct,
"amount": amount,
"status": "pending",
"timestamp": timestamp,
})
resolve_transactions(timestamp)
txn = couch.get("transactions/" + txn._id)
if txn_status == "rejected":
raise InsufficientFunds()
几点说明:
为简洁起见,此特定实现假定一定数量的
CouchDB 的 map-reduce 中的原子性。更新代码使其不依赖
该假设留给读者作为练习。
没有考虑主/主复制或 CouchDB 的文档同步
考虑。主/主复制和同步使这个问题
困难得多。
在实际系统中,使用time() 可能会导致冲突,因此使用
熵多一点的东西可能是个好主意;可能是"%s-%s"
%(time(), uuid()),或者在排序中使用文档的_id。
包括时间不是绝对必要的,但它有助于保持逻辑
如果多个请求几乎同时进入。