【问题标题】:flushing the content of a core.async channel刷新 core.async 通道的内容
【发布时间】:2017-03-12 15:36:37
【问题描述】:

考虑一个 core.async 通道,它是这样创建的:

(def c (chan))

让我们假设值从不同的地方(例如,在 go-loops 中)放入并带到这个通道。

如何在某个时间刷新通道上的所有项目?

例如,可以将通道设为原子,然后发生这样的事件:

(def c (atom (chan))

(defn reset []
  (close! @c)
  (reset! c (chan)))

还有其他方法吗?

【问题讨论】:

  • 频道从不同地方读取的原因是什么?你不能保证它会在哪里被阅读
  • 关闭通道不会“刷新”项目。他们将在频道中等待直到阅读。

标签: clojure clojurescript core.async


【解决方案1】:

使用into 将所有内容读取到向量中,不要使用它。

(go (async/into [] c)) 

【讨论】:

    【解决方案2】:

    让我们更清楚地定义您似乎想要做什么:您的代码在多个 go-loop 中运行,每个循环都将数据放在同一个通道上。您希望能够告诉他们所有人:“您正在投放价值观的渠道已经不行了;从现在开始,将您的价值观放在其他渠道上。”如果这不是您想要做的,那么您最初的问题就没有多大意义,因为没有“冲洗”要做 - 您要么接受放在频道上的值,要么不接受。

    首先,了解您的方法行不通的原因,您的问题涉及到的原因:如果您deref 一个原子c,您将获得一个通道,并且该值始终是相同的通道。您在go-loops 中有代码,已调用>!,目前已停放,等待接受者。当您关闭 @c 时,那些停放的线程保持停放(任何人在从频道 (<!)采取 时停放的人将在频道关闭时立即获得值 nil,但停放 >! s 将保持停放)。你可以整天reset!c,但停放的线程仍然停在他们从derefing 获得的先前值上。

    那么,你是怎么做到的呢?这是一种方法。

    (require '[clojure.core.async :as a
      :refer [>! <! >!! <!! alt! take! go-loop chan close! mult tap]])
    
    (def rand-int-chan (chan))
    (def control-chan (chan))
    (def control-chan-mult (mult control-chan))
    
    (defn create-worker
      [put-chan control-chan worker-num]
      (go-loop [put-chan put-chan]
        (alt!
          [[put-chan (rand-int 10)]]
          ([_ _] (println (str "Worker" worker-num " generated value."))
                 (recur put-chan))
    
          control-chan
          ([new-chan] (recur new-chan)))))
    
    (defn create-workers
      [n c cc]
      (dotimes [n n]
        (let [tap-chan (chan)]
          (a/tap cc tap-chan)
          (create-worker c tap-chan n))))
    
    (create-workers 5 rand-int-chan control-chan-mult)
    

    因此,我们将创建 5 个工作循环,将其结果放在 rand-int-chan 上,我们将为它们提供一个“控制通道”。我将让您自行探索multtap,但简而言之,我们正在创建一个可以设置值的单一频道,然后将该值广播到所有点击它的频道。

    在我们的工作循环中,我们做两件事之一:将一个值放在我们创建它时使用的rand-int-chan 上,或者我们将从这个控制通道中取出一个值。我们可以巧妙地让工作线程知道放置其值的通道已更改,方法是实际上将新通道交给它,然后它将在下一次循环中绑定。所以,看看它的实际效果:

    (<!! rand-int-chan)
    
    => 6
    Worker2 generated value.
    

    这将从通道中获取随机整数,并且工作线程将打印它已生成一个值,以查看确实有多个线程参与其中。

    现在,假设我们要更改频道以放置随机整数。没问题,我们这样做:

    (def new-rand-int-chan (chan))
    (>!! control-chan new-rand-int-chan)
    (close! rand-int-chan) ;; for good measure, may not be necessary
    

    我们创建频道,然后将该频道放到我们的control-chan 上。当我们这样做时,每个工作线程都会执行其alt! 的第二部分,它简单地循环回到go-loop 的顶部,除了这一次,put-chan 将绑定到new-rand-int-chan 我们刚收到。所以现在:

    (<!! new-rand-int-chan)
    
    => 3
    Worker1 generated value.
    

    这给了我们整数,这正是我们想要的。任何从旧频道&lt;!! 的尝试都会得到nil,因为我们关闭了频道:

    (<!! rand-int-chan)
    ; nil
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2015-07-11
      • 1970-01-01
      • 1970-01-01
      • 2015-07-04
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多