【问题标题】:How can I improve this Clojure Component+async example?如何改进这个 Clojure 组件+异步示例?
【发布时间】:2016-11-04 23:36:02
【问题描述】:

我想弄清楚如何最好地创建异步组件,或以组件友好的方式容纳异步代码。这是我能想到的最好的了,而且……感觉不太对劲。

要点:接受文字,uppercase 他们和reverse 他们,最后是print 他们。

问题 1:我无法让 system 停在最后。我希望看到 println 个人 c-chans 停止,但不要。

问题 2:如何正确注入 deps。进入producer/consumer fns?我的意思是,它们不是组件,我认为它们应该是组件,因为它们没有合理的生命周期。

问题 3:我如何惯用地处理名为 a>bb>casync/pipeline-creating 副作用? pipeline 应该是一个组件吗?

(ns pipelines.core
  (:require [clojure.core.async :as async
             :refer [go >! <! chan pipeline-blocking close!]]
            [com.stuartsierra.component :as component]))


;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; PIPELINES
(defn a>b [a> b>]
  (pipeline-blocking 4
                     b>
                     (map clojure.string/upper-case)
                     a>))
(defn b>c [b> c>]
  (pipeline-blocking 4
                     c>
                     (map (comp (partial apply str)
                                reverse))
                     b>))


;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; PRODUCER / CONSUMER
(defn producer [a>]
  (doseq [word ["apple" "banana" "carrot"]]
    (go (>! a> word))))

(defn consumer [c>]
  (go (while true
        (println "Your Word Is: " (<! c>)))))



;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; SYSTEM
(defn pipeline-system [config-options]
  (let [c-chan (reify component/Lifecycle
                 (start [this]
                   (println "starting chan: " this)
                   (chan 1))
                 (stop [this]
                   (println "stopping chan: " this)
                   (close! this)))]
    (-> (component/system-map
         :a> c-chan
         :b> c-chan
         :c> c-chan)
        (component/using {}))))


;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; RUN IT!
(def system (atom nil))
(let [_      (reset! system (component/start (pipeline-system {})))
      _      (a>b (:a> @system) (:b> @system))
      _      (b>c (:b> @system) (:c> @system))
      _      (producer (:a> @system))
      _      (consumer (:c> @system))
      _      (component/stop @system)])

编辑

我开始考虑以下问题,但我不太确定它是否正确关闭...

(extend-protocol component/Lifecycle
  clojure.core.async.impl.channels.ManyToManyChannel
  (start [this]
    this)
  (stop [this]
    (close! this)))

【问题讨论】:

    标签: clojure components core.async


    【解决方案1】:

    我稍微重写了你的示例以使其可重新加载

    可重载流水线

    (ns pipeline
      (:require [clojure.core.async :as ca :refer [>! <!]]
                [clojure.string :as s]))
    
    (defn upverse [from to]
      (ca/pipeline-blocking 4
                            to
                            (map (comp s/upper-case
                                       s/reverse))
                            from))
    (defn produce [ch xs]
      (doseq [word xs]
        (ca/go (>! ch word))))
    
    (defn consume [ch]
      (ca/go-loop []
                  (when-let [word (<! ch)]
                    (println "your word is:" word)
                    (recur))))
    
    (defn start-engine []
      (let [[from to] [(ca/chan) (ca/chan)]]
        (upverse to from)
        (consume from)
        {:stop (fn []
                 (ca/close! to)
                 (ca/close! from)
                 (println "engine is stopped"))
         :process (partial produce to)}))
    

    这样你就可以做(start-engine)并用它来处理单词序列:

    REPL 时间

    boot.user=> (require '[pipeline])
    
    boot.user=> (def engine (pipeline/start-engine))
    #'boot.user/engine
    

    与它一起运行

    boot.user=> ((engine :process) ["apple" "banana" "carrot"])
    
    your word is: TORRAC
    your word is: ANANAB
    your word is: ELPPA
    
    boot.user=> ((engine :process) ["do" "what" "makes" "sense"])
    
    your word is: OD
    your word is: SEKAM
    your word is: ESNES
    your word is: TAHW
    

    停止它

    boot.user=> ((:stop engine))
    engine is stopped
    
    ;; engine would not process anymore
    boot.user=> ((engine :process) ["apple" "banana" "carrot"])
    nil
    

    状态管理

    根据您打算如何使用此管道,可能根本不需要像 Component 这样的状态管理框架:无需添加任何“以防万一”,在这种情况下启动和停止管道只需调用两个功能。

    但是,如果在具有更多状态的大型应用中使用此管道,您肯定可以从状态管理库中受益。

    我是not a fan of Component 主要是因为它需要一个完整的应用程序购买(这使它成为一个框架),但我尊重其他使用它的人。

    挂载

    如果应用程序很小,我建议不要使用任何特定的东西:例如,您可以将此管道与其他管道/逻辑组合并从 -main 启动它,但如果应用程序更大并且具有更多不相关的状态,您只需将mount 添加到其中:

    (defstate engine :start (start-engine)
                     :stop ((:stop engine)))
    

    启动管道

    boot.user=> (mount/start)
    {:started ["#'pipeline/engine"]}
    

    与它一起运行

    boot.user=> ((engine :process) ["do" "what" "makes" "sense"])
    
    your word is: OD
    your word is: SEKAM
    your word is: ESNES
    your word is: TAHW
    

    停止它

    boot.user=> (mount/stop)
    engine is stopped
    {:stopped ["#'pipeline/engine"]}
    

    这是一个gist with a full example,其中包括build.boot

    你可以通过boot repl下载和玩它


    [编辑]:回答 cmets

    如果您已经迷上了 Component,这应该可以帮助您入门:

    (defrecord WordEngine []
      component/Lifecycle
    
      (start [component]
        (merge component (start-engine)))
    
      (stop [component]
        ((:stop component))
        (assoc component :process nil :stop nil)))
    

    这将在开始时创建一个 WordEngine 对象,该对象将具有 :process 方法

    您将无法像调用普通 Clojure 函数一样调用它:即从 REPL 或任何命名空间仅通过 :requireing 调用它,除非您传递对不推荐的整个系统的引用。

    所以为了调用它,这个WordEngine 需要插入到一个组件系统中,然后注入另一个组件,然后它可以解构:process 函数并调用它。

    【讨论】:

    • Mount+Async 的好例子,就像你说的,它可能更适合较小的用例。在为 this 工作选择 Component 之前,我实际上花了很多时间在 Component Mount 上,即因为我喜欢“框架”提供的东西。对我来说,这是一个已经“购买”到 Component 的更大系统的一部分,我真的很想为组件运行一个示例,尽管我会听从它,因为它是一个很棒的 Mount 示例! (另外,我喜欢它是 boot,我打算最终切换 :)
    • 我所说的较小的情况是什么都不使用:只是 Clojure。 Mount 目前在许多大型应用程序中使用非常成功。事实上,一周前我刚刚听到一家公司将他们的 20K LOC 应用程序从 Component 切换到 Mount 并且非常高兴。我在答案中添加了一个示例组件示例。
    • 感谢您一直以来倡导和支持Mount!与我对 stuart sierra 和 weavejester (james?) 对 Component 的理解相比,我怀疑他们解决这个玩具问题的方法看起来更通用,即只是一个 channelpipeline 组件(带有 @ 987654346@s?),而不是WordEngine,并纠正您对它征收的一些(否则有效的)批评。尽管如此,肯定会有权衡,你说服我给mount更多的机会,所以谢谢你!我很感激它!
    • 当然,Josh,非常欢迎 :) 使其成为带有“boundary”协议的channel / pipeline 与组件或安装(正交选择)无关,因为这只是一个可以在两者中实现的设计选择。在您的示例/用例中,我认为这种抽象级别会有点过头了(我认为)。
    猜你喜欢
    • 1970-01-01
    • 2016-07-19
    • 2023-04-06
    • 1970-01-01
    • 1970-01-01
    • 2018-01-29
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多