【发布时间】:2011-03-27 01:00:47
【问题描述】:
背景
为了响应question,我构建了uploaded a bounded-tchan(我不适合上传jnb's version)。如果名称不够,则 bounded-tchan (BTChan) 是具有最大容量的 STM 通道(如果通道已满,则写入块)。
最近,我收到了一个添加 dup 功能的请求,例如 regular TChan's。问题就这样开始了。
BTChan 的外观
下面是 BTChan 的简化(实际上是非功能性)视图。
data BTChan a = BTChan
{ max :: Int
, count :: TVar Int
, channel :: TVar [(Int, a)]
, nrDups :: TVar Int
}
每次您写入频道时,您都会在元组中包含重复次数 (nrDups) - 这是一个“单个元素计数器”,表示有多少读者获得了该元素。
每个读取器都会减少它读取的元素的计数器,然后将它的读取指针移动到列表中的下一个元素。如果阅读器将计数器减至零,则count 的值会减少以正确反映通道上的可用容量。
要明确所需的语义:通道容量表示通道中排队的最大元素数。任何给定的元素都会排队,直到每个 dup 的读取器接收到该元素。任何元素都不应为 GCed dup 排队(这是主要问题)。
例如,让容量为 2 的通道(c1、c2、c3)有 3 个 dup,其中 2 个项目被写入通道,然后所有项目都从c1 和c2 中读出。频道仍满(剩余容量为 0),因为 c3 尚未消耗其副本。在任何时间点,如果对c3 的所有引用都被删除(因此c3 被GCed),那么容量应该被释放(在这种情况下恢复为2)。
问题来了:假设我有以下代码
c <- newBTChan 1
_ <- dupBTChan c -- This represents what would probably be a pathological bug or terminated reader
writeBTChan c "hello"
_ <- readBTChan c
使 BTChan 看起来像:
BTChan 1 (TVar 0) (TVar []) (TVar 1) --> -- newBTChan
BTChan 1 (TVar 0) (TVar []) (TVar 2) --> -- dupBTChan
BTChan 1 (TVar 1) (TVar [(2, "hello")]) (TVar 2) --> -- readBTChan c
BTChan 1 (TVar 1) (TVar [(1, "hello")]) (TVar 2) -- OH NO!
注意最后"hello" 的读取计数仍然是1?这意味着该消息不会被视为已消失(即使它会在实际实现中被 GC),并且我们的 count 永远不会递减。由于通道已满载(最多 1 个元素),因此写入器将始终阻塞。
我想要在每次调用 dupBTChan 时创建一个终结器。当收集到复制(或原始)通道时,该通道上剩余的所有要读取的元素将递减每个元素的计数,nrDups 变量也将递减。因此,未来的写入将具有正确的count(count,不会为 GCed 通道未读取的变量保留空间)。
解决方案 1 - 手动资源管理(我想避免的)
JNB 的 bounded-tchan 实际上有手动资源管理的原因。请参阅cancelBTChan。我打算让用户更难出错(并不是说手动管理在许多情况下不是正确的方法)。
解决方案 2 - 通过阻止 TVar 使用异常(GHC 无法按照我的意愿执行此操作)
编辑这个解决方案,解决方案 3 只是一个衍生产品,不起作用!由于bug 5055 (WONTFIX),GHC 编译器将异常发送到两个阻塞线程,即使一个就足够了(理论上可以确定,但对于 GHC GC 不实用)。
如果获得BTChan 的所有方法都是IO,我们可以forkIO 一个线程读取/重试给定BTChan 唯一的额外(虚拟)TVar 字段。当所有其他对 TVar 的引用都被删除时,新线程将捕获异常,因此它将知道何时减少 nrDups 和单个元素计数器。这应该可行,但会强制我的所有用户使用 IO 来获取他们的BTChans:
data BTChan = BTChan { ... as before ..., dummyTV :: TVar () }
dupBTChan :: BTChan a -> IO (BTChan a)
dupBTChan c = do
... as before ...
d <- newTVarIO ()
let chan = BTChan ... d
forkIO $ watchChan chan
return chan
watchBTChan :: BTChan a -> IO ()
watchBTChan b = do
catch (atomically (readTVar (dummyTV b) >> retry)) $ \e -> do
case fromException e of
BlockedIndefinitelyOnSTM -> atomically $ do -- the BTChan must have gotten collected
ls <- readTVar (channel b)
writeTVar (channel b) (map (\(a,b) -> (a-1,b)) ls)
readTVar (nrDup b) >>= writeTVar (nrDup b) . (-1)
_ -> watchBTChan b
编辑:是的,这是一个糟糕的 man 终结器,我没有任何特别的理由避免使用 addFinalizer。那将是相同的解决方案,仍然强制使用 IO afaict。
解决方案 3:比解决方案 2 更简洁的 API,但 GHC 仍然不支持它
用户通过调用initBTChanCollector 来启动一个管理器线程,它将监控一组这些虚拟 TVar(来自解决方案 2)并进行所需的清理。基本上,它将 IO 推入另一个线程,该线程通过全局 (unsafePerformIOed) TVar 知道该做什么。事情基本上像解决方案 2 一样工作,但 BTChan 的创建仍然可以是 STM。未能运行initBTChanCollector 将导致进程运行时任务列表不断增长(空间泄漏)。
解决方案 4:永远不允许丢弃 BTChans
这类似于忽略问题。如果用户从不丢掉一个欺骗的BTChan,那么问题就消失了。
解决方案 5 我看到了 ezyang 的回答(完全有效和赞赏),但我真的很想保留当前的 API 只是一个 'dup' 函数。
** 解决方案 6** 请告诉我有更好的选择。
编辑:
我 implemented solution 3(完全未经测试的 alpha 版本)并通过将全局本身设为 BTChan 来处理潜在的空间泄漏 - chan 的容量可能应该为 1,所以忘记运行 init 会很快出现,但这是一个微小的变化。这在 GHCi (7.0.3) 中有效,但这似乎是偶然的。 GHC 会向两个被阻塞的线程(读取 BTChan 和监视线程的有效线程)抛出异常,所以如果你在另一个线程丢弃它的引用时被阻止读取 BTChan,那么你就会死。
【问题讨论】:
-
我不明白你到底在想什么。对于资源,重复通道的语义应该是什么?如果通道和副本都已满,则通道会阻塞?如果其中一个已满?
-
对,这些语义需要澄清一下。如果您尝试实现“如果通道和副本都已满,则通道阻塞”,那么您需要问,我是否允许从队列中删除元素?如果答案是否定的,那么您又获得了一个无限通道。
-
(询问请求 dup 的人他们打算使用它的目的也很有用。)
-
@Heinrich, ezyang 一个频道的 Dups 都共享资源,一个频道的剩余容量等于该频道的最大容量减去人口最多的副本中剩余要读取的元素数。如果删除了对副本的所有引用,则该 dup 永远不会导致容量减少(即接收人口为零)。我刚才试图在编辑中用不同的词表达同样的意思。
-
在这种情况下,您似乎需要一个有界通道和一个多路复用器。所以听起来你可能只使用 MVar 来控制对队列的访问(他们会阻止尝试获取变量,但如果 MVar 上阻塞的线程死亡,它会自行释放)
标签: haskell concurrency transactional-memory