为了了解commute 的工作原理,我做了一些实验。我想把我的解释分成三部分:
比较和设置语义
我认为Clojure for the Brave and True已经解释得很好:
swap! 实现“比较和设置”语义,这意味着它在内部执行以下操作:
- 它读取原子的当前状态
- 然后它将更新函数应用于该状态
- 接下来,它会检查它在步骤 1 中读取的值是否与原子的当前值相同
- 如果是,则交换!更新原子以引用第 2 步的结果
- 如果不是,则交换!重试,再次执行第 1 步。
swap! 是针对atom 的,但是知道它会帮助我们理解alter 和commute,因为他们使用类似的方法来更新ref。
与atom、ref 不同,修改(通过alter、commute、ref-set)必须包装在事务中。当事务开始(或重试)时,它将捕获所有包含ref 的快照(因为alter 需要它)。 ref 只有在事务提交时才会被修改。
alter
在一个事务中,所有将被alter修改的ref组成一个组。如果组中的任何一个ref 更改失败,事务将被重试。基本上alter 做了以下事情:
- 将其更改的
ref 与事务捕获的快照进行比较。如果它们看起来不同,请重试事务;其他
- 使用提供的函数从快照创建一个新状态。
- 再次将
ref 与快照进行比较。如果它们看起来不同,请重试事务;其他
- 尝试写锁定
ref,在此事务试用结束之前不要让任何人修改它。如果失败(ref 已经被锁定),等待一段时间(例如 100 毫秒),然后重试事务。
- 告诉事务在执行委托时将此
ref 更新为新状态。
让我们演示一个平滑的更改。首先,我们将创建一个线程t1 到alter 3 个计数器c1、c2 和c3 和slow-inc。
(ns testing.core)
(def start (atom 0)) ; Record start time.
(def c1 (ref 0)) ; Counter 1
(def c2 (ref 0)) ; Counter 2
(def c3 (ref 0)) ; Counter 3
(defn milliTime
"Get current time in millisecond."
[]
(int (/ (System/nanoTime) 1000000)))
(defn lap
"Get elapse time since 'start' in millisecond."
[]
(- (milliTime) @start))
(defn slow-inc
"Slow increment, takes 1 second."
[x x-name]
(println "slow-inc beg" x-name ":" x "|" (lap) "ms")
(Thread/sleep 1000)
(println "slow-inc end" x-name ":" (inc x) "|" (lap) "ms")
(inc x))
(defn fast-inc
"Fast increment. The value it prints is incremented."
[x x-name]
(println "fast-inc " x-name ":" (inc x) "|" (lap) "ms")
(inc x))
(defn -main
[]
;; Initialize c1, c2, c3 and start.
(dosync (ref-set c1 0)
(ref-set c2 0)
(ref-set c3 0))
(reset! start (milliTime))
;; Start two new threads simultaneously.
(let [t1 (future
(dosync
(println "transaction start |" (lap) "ms")
(alter c1 slow-inc "c1")
(alter c2 slow-inc "c2")
(alter c3 slow-inc "c3")
(println "transaction end |" (lap) "ms")))
t2 (future)]
;; Dereference all of them (wait until all 2 threads finish).
@t1 @t2
;; Print final counters' values.
(println "c1 :" @c1)
(println "c2 :" @c2)
(println "c3 :" @c3)))
我们得到了这个:
transaction start | 3 ms ; 1st try
slow-inc beg c1 : 0 | 8 ms
slow-inc end c1 : 1 | 1008 ms
slow-inc beg c2 : 0 | 1009 ms
slow-inc end c2 : 1 | 2010 ms
slow-inc beg c3 : 0 | 2010 ms
slow-inc end c3 : 1 | 3011 ms
transaction end | 3012 ms
c1 : 1
c2 : 1
c3 : 1
过程顺利。没有惊喜。
让我们看看如果ref(比方说c3)在修改之前在修改((alter c3 ...))会发生什么。我们将在更改c1 时对其进行修改。编辑let对t2的绑定为:
t2 (future
(Thread/sleep 900) ; Increment at 900 ms
(dosync (alter c3 fast-inc "c3")))
结果:
transaction start | 2 ms ; 1st try
slow-inc beg c1 : 0 | 7 ms
fast-inc c3 : 1 | 904 ms ; c3 being modified in thread t2
slow-inc end c1 : 1 | 1008 ms
slow-inc beg c2 : 0 | 1009 ms
slow-inc end c2 : 1 | 2010 ms
transaction start | 2011 ms ; 2nd try
slow-inc beg c1 : 0 | 2011 ms
slow-inc end c1 : 1 | 3012 ms
slow-inc beg c2 : 0 | 3013 ms
slow-inc end c2 : 1 | 4014 ms
slow-inc beg c3 : 1 | 4015 ms
slow-inc end c3 : 2 | 5016 ms
transaction end | 5016 ms
c1 : 1
c2 : 1
c3 : 2
如您所见,在 1st-try-(alter c3 ...) 的第 1 步中,它意识到c3 (val = 1) 看起来与事务捕获的快照 (val = 0) 不同,因此它重试事务。
现在,如果ref(比方说c1)在在其更改((alter c1 ...))期间被修改了怎么办?我们将在线程t2 上修改c1。编辑let对t2的绑定为:
t2 (future
(Thread/sleep 900) ; Increment at 900 ms
(dosync (alter c1 fast-inc "c1")))
结果:
transaction start | 3 ms ; 1st try
slow-inc beg c1 : 0 | 8 ms
fast-inc c1 : 1 | 904 ms ; c1 being modified in thread t2
slow-inc end c1 : 1 | 1008 ms
transaction start | 1009 ms ; 2nd try
slow-inc beg c1 : 1 | 1009 ms
slow-inc end c1 : 2 | 2010 ms
slow-inc beg c2 : 0 | 2011 ms
slow-inc end c2 : 1 | 3011 ms
slow-inc beg c3 : 0 | 3012 ms
slow-inc end c3 : 1 | 4013 ms
transaction end | 4014 ms
c1 : 2
c2 : 1
c3 : 1
这一次,在1st-try-(alter c1 ...)的第3步,发现ref被修改了,所以调用事务重试。
现在,让我们尝试修改ref(比如说c1)在它的修改((alter c1 ...))之后。我们会在修改c2时对其进行修改。
t2 (future
(Thread/sleep 1600) ; Increment at 1600 ms
(dosync (alter c1 fast-inc "c1")))
结果:
transaction start | 3 ms ; 1st try
slow-inc beg c1 : 0 | 8 ms
slow-inc end c1 : 1 | 1009 ms
slow-inc beg c2 : 0 | 1010 ms
fast-inc c1 : 1 | 1604 ms ; try to modify c1 in thread t2, but failed
fast-inc c1 : 1 | 1705 ms ; keep trying...
fast-inc c1 : 1 | 1806 ms
fast-inc c1 : 1 | 1908 ms
fast-inc c1 : 1 | 2009 ms
slow-inc end c2 : 1 | 2011 ms
slow-inc beg c3 : 0 | 2012 ms
fast-inc c1 : 1 | 2110 ms ; still trying...
fast-inc c1 : 1 | 2211 ms
fast-inc c1 : 1 | 2312 ms
fast-inc c1 : 1 | 2413 ms
fast-inc c1 : 1 | 2514 ms
fast-inc c1 : 1 | 2615 ms
fast-inc c1 : 1 | 2716 ms
fast-inc c1 : 1 | 2817 ms
fast-inc c1 : 1 | 2918 ms ; and trying....
slow-inc end c3 : 1 | 3012 ms
transaction end | 3013 ms ; 1st try ended, transaction committed.
fast-inc c1 : 2 | 3014 ms ; finally c1 modified successfully
c1 : 2
c2 : 1
c3 : 1
由于1st-try-(alter c1 ...)已经锁定c1(第4步),所以没有人可以修改c1,直到本轮交易试用结束。
alter 就是这样。
那么,如果我们不想将c1、c2、c3 全部组合在一起怎么办?假设我想在c1 或c3 更改失败(在事务期间被其他线程修改)时重试事务仅。我不在乎c2 的状态。如果在交易过程中修改了c2,则无需重试交易,这样可以节省一些时间。我们如何做到这一点?是的,通过commute。
commute
基本上,commute 执行以下操作:
-
直接使用
ref(不是来自快照)运行提供的函数,但不对结果执行任何操作。
- 在事务提交之前要求事务调用
real-commute 并使用相同的参数。 (real-commute 只是我编造的名字。)
我实际上不知道为什么commute 必须运行第 1 步。在我看来,仅第 2 步就足够了。 real-commute 执行以下操作:
- 如果
ref 未被锁定,则读写锁定ref 直到该事务试用结束,否则重试该事务。
- 使用给定函数从
ref创建一个新状态。
- 告诉事务在执行委托时将此
ref 更新为新状态。
让我们检查一下。将let的绑定编辑成:
t1 (future
(dosync
(println "transaction start |" (lap) "ms")
(alter c1 slow-inc "c1")
(commute c2 slow-inc "c2") ; changed to commute
(alter c3 slow-inc "c3")
(println "transaction end |" (lap) "ms")))
t2 (future)
结果:
transaction start | 3 ms
slow-inc beg c1 : 0 | 7 ms ; called by alter
slow-inc end c1 : 1 | 1008 ms
slow-inc beg c2 : 0 | 1009 ms ; called by commute
slow-inc end c2 : 1 | 2009 ms
slow-inc beg c3 : 0 | 2010 ms ; called by alter
slow-inc end c3 : 1 | 3011 ms
transaction end | 3012 ms
slow-inc beg c2 : 0 | 3012 ms ; called by real-commute
slow-inc end c2 : 1 | 4012 ms
c1 : 1
c2 : 1
c3 : 1
所以如果你使用commute,slow-inc 会被调用两次,一次被commute 调用,一次被real-commute 在事务提交之前调用。第一个commute 没有对slow-inc 的结果做任何事情。
slow-inc 可以被调用两次以上。比如我们尝试修改线程t2上的c3:
t2 (future
(Thread/sleep 500) ; modify c3 at 500 ms
(dosync (alter c3 fast-inc "c3")))
结果:
transaction start | 2 ms
slow-inc beg c1 : 0 | 8 ms
fast-inc c3 : 1 | 504 ms ; c3 modified at thread t2
slow-inc end c1 : 1 | 1008 ms
slow-inc beg c2 : 0 | 1009 ms ; 1st time
slow-inc end c2 : 1 | 2010 ms
transaction start | 2012 ms
slow-inc beg c1 : 0 | 2012 ms
slow-inc end c1 : 1 | 3013 ms
slow-inc beg c2 : 0 | 3014 ms ; 2nd time
slow-inc end c2 : 1 | 4015 ms
slow-inc beg c3 : 1 | 4016 ms
slow-inc end c3 : 2 | 5016 ms
transaction end | 5017 ms
slow-inc beg c2 : 0 | 5017 ms ; 3rd time
slow-inc end c2 : 1 | 6018 ms
c1 : 1
c2 : 1
c3 : 2
在事务的第一次试用中,(commute c2 ...) 被评估后,(alter c3 ...) 发现c3 与快照不同,因此触发事务重试。如果(alter c3 ...)在(commute c2 ...)之前,则在评估或(commute c2 ..)之前触发重试。因此,将所有 commutes 放置在所有 alters 之后可能会节省一些时间。
让我们看看如果在t1 中的事务正在评估时修改线程t2 中的c2 会发生什么。
t2 (future
(Thread/sleep 500) ; before evaluation of (commute c2 ...)
(dosync (alter c2 fast-inc "c2"))
(Thread/sleep 1000) ; during evaluation of (commute c2 ...)
(dosync (alter c2 fast-inc "c2"))
(Thread/sleep 1000) ; after evaluation of (commute c2 ...)
(dosync (alter c2 fast-inc "c2")))
结果:
transaction start | 3 ms
slow-inc beg c1 : 0 | 9 ms
fast-inc c2 : 1 | 504 ms ; before
slow-inc end c1 : 1 | 1009 ms
slow-inc beg c2 : 1 | 1010 ms
fast-inc c2 : 2 | 1506 ms ; during
slow-inc end c2 : 2 | 2011 ms
slow-inc beg c3 : 0 | 2012 ms
fast-inc c2 : 3 | 2508 ms ; after
slow-inc end c3 : 1 | 3013 ms
transaction end | 3013 ms
slow-inc beg c2 : 3 | 3014 ms
slow-inc end c2 : 4 | 4014 ms
c1 : 1
c2 : 4
c3 : 1
如您所见,没有交易重试,c2 仍然更新为我们的预期值 (4),感谢real-commute。
现在我想在real-commute 中演示第1 步的效果:它的ref 是读写锁定的。首先,确认它是读锁定的:
t2 (future
(Thread/sleep 3500) ; during real-commute
(println "try to read c2:" @c2 " |" (lap) "ms"))
结果:
transaction start | 3 ms
slow-inc beg c1 : 0 | 9 ms
slow-inc end c1 : 1 | 1010 ms
slow-inc beg c2 : 0 | 1010 ms
slow-inc end c2 : 1 | 2011 ms
slow-inc beg c3 : 0 | 2012 ms
slow-inc end c3 : 1 | 3012 ms
transaction end | 3013 ms
slow-inc beg c2 : 0 | 3013 ms
slow-inc end c2 : 1 | 4014 ms
try to read c2: 1 | 4015 ms ; got printed after transaction trial ended
c1 : 1
c2 : 1
c3 : 1
@c2 被阻止,直到 c2 被解锁。这就是为什么println 在 4000 毫秒后得到评估,即使我们的订单是休眠 3500 毫秒。
由于commute 和alter 需要读取它们的ref 来执行给定的功能,它们将被阻止,直到它们的ref 也被解锁。您可以尝试将(println ...) 替换为(alter c2 fast-inc "c2")。效果应该和这个例子一样。
所以,为了确认它是写锁定的,我们可以使用ref-set:
t2 (future
(Thread/sleep 3500) ; during real-commute
(dosync (ref-set c2 (fast-inc 9 " 8"))))
结果:
transaction start | 3 ms
slow-inc beg c1 : 0 | 8 ms
slow-inc end c1 : 1 | 1008 ms
slow-inc beg c2 : 0 | 1010 ms
slow-inc end c2 : 1 | 2011 ms
slow-inc beg c3 : 0 | 2012 ms
slow-inc end c3 : 1 | 3013 ms
transaction end | 3014 ms
slow-inc beg c2 : 0 | 3014 ms
fast-inc 8 : 9 | 3504 ms ; try to ref-set but failed
fast-inc 8 : 9 | 3605 ms ; try again...
fast-inc 8 : 9 | 3706 ms
fast-inc 8 : 9 | 3807 ms
fast-inc 8 : 9 | 3908 ms
fast-inc 8 : 9 | 4009 ms
slow-inc end c2 : 1 | 4015 ms
fast-inc 8 : 9 | 4016 ms ; finally success, c2 ref-set to 9
c1 : 1
c2 : 9
c3 : 1
从这里你也可以猜到ref-set做了什么:
- 如果其
ref已被写锁定,则在一段时间后重试事务(例如100毫秒);否则告诉交易在执行佣金时将此ref 更新为给定值。
real-commute 也可能失败,当它的ref 在第 1 步被锁定时。与alter 或ref-set 不同,它在重试事务之前不会等待一段时间。如果ref 锁定时间过长,这可能会导致问题。例如,我们将尝试修改 c1 后,使用 commute:
t2 (future
(Thread/sleep 2500) ; during alteration of c3
(dosync (commute c1 fast-inc "c1")))
结果:
transaction start | 3 ms
slow-inc beg c1 : 0 | 8 ms
slow-inc end c1 : 1 | 1008 ms
slow-inc beg c2 : 0 | 1010 ms
slow-inc end c2 : 1 | 2011 ms
slow-inc beg c3 : 0 | 2012 ms
fast-inc c1 : 1 | 2506 ms
fast-inc c1 : 1 | 2506 ms
fast-inc c1 : 1 | 2506 ms
...
Exception in thread "main" java.util.concurrent.ExecutionException:
java.lang.RuntimeException: Transaction failed after reaching retry
limit, compiling: ...
回想一下,c1 在更改后被alter 写锁定,因此real-commute 不断失败并不断重试事务。没有缓冲时间,就达到了交易重试上限,火爆起来。
注意
commute 通过让用户减少将导致事务重试的ref 来帮助提高并发性,调用给定函数至少两次以更新其ref 的成本。在某些情况下,commute 可能比alter 慢。例如,当事务中唯一要做的事情是更新ref,commute 的成本高于alter:
(def c (ref 0)) ; counter
(defn slow-inc
[x]
(Thread/sleep 1000)
(inc x))
(defn add-2
"Create two threads to slow-inc c simultaneously with func.
func can be alter or commute."
[func]
(let [t1 (future (dosync (func c slow-inc)))
t2 (future (dosync (func c slow-inc)))]
@t1 @t2))
(defn -main
[& args]
(dosync (ref-set c 0))
(time (add-2 alter))
(dosync (ref-set c 0))
(time (add-2 commute)))
结果:
"Elapsed time: 2003.239891 msecs" ; alter
"Elapsed time: 4001.073448 msecs" ; commute
这是alter的程序:
- 0 毫秒:
t1 的 alter 已启动。
- 1 毫秒:
t2 的 alter 已启动。
- 1000 毫秒:
t1 的 alter 成功,t1 已提交,c 变为 1。
- 1001 毫秒:
t2 的alter 发现c 与其快照不同(步骤2),重试事务。
- 2001 毫秒:
t2 的 alter 成功,t2 已提交,c 变为 2。
以及commute的程序:
- 0 毫秒:
t1 的 commute 已启动。
- 1 毫秒:
t2 的 commute 已启动。
- 1000 毫秒:
t1 的 real-commute 已启动。 c 已锁定。
- 1001 毫秒:
t2 的 real-commute 已启动。发现c被锁定,重试事务(步骤1)。
- 1002 毫秒:
t2 的 commute 已启动,但 c 已锁定,因此被阻止。
- 2000 毫秒:
t1 的real-commute 结束,事务已提交。 c 变为 1。t2 已解锁。
- 3002 毫秒:
t2 的 real-commute 已启动。
- 4002 毫秒:
t2 的real-commute 结束,事务已提交。 c 变成了 2。
这就是本例中commute 比alter 慢的原因。
这可能与来自 clojuredocs.org 的 example of commute 相矛盾。关键区别在于,在他的示例中,延迟(100 毫秒)发生在事务主体中,但在我的示例中,延迟发生在 slow-inc 中。这种差异导致他的real-commute 阶段运行非常快,从而减少了锁定时间和阻塞时间。更少的锁定时间意味着更少的重试概率。这就是为什么在他的示例中,commute 比 alter 快。把他的inc改成slow-inc,你会得到和我一样的观察结果。
就是这样。