【问题标题】:Clojure agents consuming from a queueClojure 代理从队列中消费
【发布时间】:2011-02-05 20:44:02
【问题描述】:

我正在尝试找出使用代理来使用消息队列 (Amazon SQS) 中的项目的最佳方式。现在我有一个函数(process-queue-item),它从队列中抓取一个项目,并处理它。

我想同时处理这些项目,但我不知道如何控制代理。基本上我想让所有代理尽可能忙,而不是从队列中拉出许多项目并产生积压(我将在几台机器上运行这个,所以项目需要留在队列中,直到他们真的需要)。

谁能给我一些改进我的实现的建议?

(def active-agents (ref 0))

(defn process-queue-item [_]
  (dosync (alter active-agents inc))
  ;retrieve item from Message Queue (Amazon SQS) and process
  (dosync (alter active-agents dec)))

(defn -main []
  (def agents (for [x (range 20)] (agent x)))

  (loop [loop-count 0]

    (if (< @active-agents 20)
      (doseq [agent agents]
        (if (agent-errors agent)
          (clear-agent-errors agent))
        ;should skip this agent until later if it is still busy processing (not sure how)
        (send-off agent process-queue-item)))

    ;(apply await-for (* 10 1000) agents)
    (Thread/sleep  10000)
    (logging/info (str "ACTIVE AGENTS " @active-agents))
    (if (> 10 loop-count)
      (do (logging/info (str "done, let's cleanup " count))
       (doseq [agent agents]
         (if (agent-errors agent)
           (clear-agent-errors agent)))
       (apply await agents)
       (shutdown-agents))
      (recur (inc count)))))

【问题讨论】:

  • 有什么方法可以将消息队列视为 seq,然后只使用 pmap 来获得并行化?
  • @Alex Stoddard:在我的情况下,process-queue-item 实际上阻塞了网络 IO,所以我认为 pmap 不是正确的选择,因为它只使用与机器内核一样多的线程.
  • @erikw:当然,但这只是一个 pmap 实现细节(线程 = #cores + 2)。没有理由您不能编写具有参数化线程数的 pmap 版本。看pmap源码的第一行:(let [n (+ 2 (..Runtime getRuntime availableProcessors))
  • 您好,我有几个问题: 1. 代理有一个价值,您对他们的价值感兴趣还是只是将其用作线程池? 2.队列消耗有最终结果还是process-queue-item有副作用?
  • @cgrand: 1) 我对代理的价值不感兴趣,只是将它们用作线程池。 2) process-queue-item 具有 seid-effects(将结果推回消息队列)。

标签: concurrency clojure queue


【解决方案1】:
(let [switch (atom true) ; a switch to stop workers
      workers (doall 
                (repeatedly 20 ; 20 workers pulling and processing items from SQS
                  #(future (while @switch 
                             (retrieve item from Amazon SQS and process)))))]
  (Thread/sleep 100000) ; arbitrary rule to decide when to stop ;-)
  (reset! switch false) ; stop !
  (doseq [worker workers] @worker)) ; waiting for all workers to be done

【讨论】:

  • 这不再适用于 1.4(futurefuture-call 不会返回 IFn,这是 repeatedly 所要求的)。不过,您可以通过在 (future 前面加上 # 来轻松地将未来包装在函数中。
  • @AlexB 很好,这甚至不是 1.4 的问题:# 应该在那里。我修复了代码,谢谢!
【解决方案2】:

您要求的是一种继续分发任务但有一定上限的方法。一种简单的方法是使用信号量来协调限制。以下是我的处理方法:

(let [limit (.availableProcessors (Runtime/getRuntime))
      ; note: you might choose limit 20 based upon your problem description
      sem (java.util.concurrent.Semaphore. limit)]
  (defn submit-future-call
    "Takes a function of no args and yields a future object that will
    invoke the function in another thread, and will cache the result and
    return it on all subsequent calls to deref/@. If the computation has
    not yet finished, calls to deref/@ will block. 
    If n futures have already been submitted, then submit-future blocks
    until the completion of another future, where n is the number of
    available processors."  
    [#^Callable task]
    ; take a slot (or block until a slot is free)
    (.acquire sem)
    (try
      ; create a future that will free a slot on completion
      (future (try (task) (finally (.release sem))))
      (catch java.util.concurrent.RejectedExecutionException e
        ; no task was actually submitted
        (.release sem)
        (throw e)))))

(defmacro submit-future
  "Takes a body of expressions and yields a future object that will
  invoke the body in another thread, and will cache the result and
  return it on all subsequent calls to deref/@. If the computation has
  not yet finished, calls to deref/@ will block.
  If n futures have already been submitted, then submit-future blocks
  until the completion of another future, where n is the number of
  available processors."  
  [& body] `(submit-future-call (fn [] ~@body)))

#_(example
    user=> (submit-future (reduce + (range 100000000)))
    #<core$future_call$reify__5782@6c69d02b: :pending>
    user=> (submit-future (reduce + (range 100000000)))
    #<core$future_call$reify__5782@38827968: :pending>
    user=> (submit-future (reduce + (range 100000000)))
    ;; blocks at this point for a 2 processor PC until the previous
    ;; two futures complete
    #<core$future_call$reify__5782@214c4ac9: :pending>
    ;; then submits the job

现在您只需要协调任务本身的执行方式即可。听起来您已经有了执行此操作的机制。循环(提交-未来(进程-队列-项))

【讨论】:

    【解决方案3】:

    也许您可以使用seque 函数?引用(doc seque):

    clojure.core/seque
    ([s] [n-or-q s])
      Creates a queued seq on another (presumably lazy) seq s. The queued
      seq will produce a concrete seq in the background, and can get up to
      n items ahead of the consumer. n-or-q can be an integer n buffer
      size, or an instance of java.util.concurrent BlockingQueue. Note
      that reading from a seque can block if the reader gets ahead of the
      producer.
    

    我想到的是通过网络获取队列项的惰性序列;您可以将其包装在seque 中,将其放入 Ref 并让工作人员代理从该seque 中消费项目。 seque 返回的东西从你的代码的角度来看就像一个常规的 seq,队列魔法以透明的方式发生。请注意,如果您放入的序列是分块的,那么它仍然会一次强制一个块。另请注意,对 seque 的初始调用本身似乎会阻塞,直到获得一两个初始项目(或一个块,视情况而定;我认为这与惰性序列的工作方式有关,而不是 seque 本身,不过)。

    代码草图(真的粗略,根本没有测试):

    (defn get-queue-items-seq []
      (lazy-seq
       (cons (get-queue-item)
             (get-queue-items-seq))))
    
    (def task-source (ref (seque (get-queue-items-seq))))
    
    (defn do-stuff []
      (let [worker (agent nil)]
        (if-let [result
                 (dosync
                   (when-let [task (first @task-source)]
                    (send worker (fn [_] (do-stuff-with task)))))]
          (do (await worker)
              ;; maybe do something with worker's state
              (do-stuff))))) ;; continue working
    
    (defn do-lots-of-stuff []
      (let [fs (doall (repeatedly 20 #(future (do-stuff))))]
        fs)))
    

    实际上,您可能想要一个更复杂的队列项目 seq 生产者,以便您可以要求它停止生产新项目(如果整个事情要能够优雅地关闭,这是必要的;期货将消亡当任务源干涸时,使用future-done? 看看他们是否已经这样做了)。这只是我第一眼就能看到的东西……我敢肯定这里还有更多需要改进的地方。不过,我认为一般方法会起作用。

    【讨论】:

    • 我在代码草图的最后一行添加了一个修复程序,以便实际创建期货。 (对整个想法至关重要,真的...... :-))
    • 我正在尝试理解这段代码。为什么任务源是参考?您似乎根本不会更改它。
    • @Siddhartha Reddy:乍一看,我会说这就是我将代码称为“really sketchy”的原因。 ;-) 我想它需要dosync 内的when-let 中的(alter task-source rest)(或next)才有用。其实,再想一想,我不知道在这里使用seque到底是不是一个好主意;现在在我看来,它增加了队列中的项目数量,这些项目在本地机器崩溃的情况下会丢失(因为seque 在工作人员请求之前将项目拉入)。再说一次,在某些情况下,它的性能可能很好;那是
    • 只是预感需要分析。顺便说一句,do-stuff 中的尾部自调用应更改为(recur),以免炸毁堆栈。我认为我在大约一个月后的 this answer 中描述了 Clojure 中的队列处理方面做得更好;我想知道它是否比这个对你更有用?无论如何,感谢您指出这个错误!
    【解决方案4】:

    不确定这是多么地道,因为我仍然是该语言的新手,但以下解决方案对我有用:

    (let [number-of-messages-per-time 2
          await-timeout 1000]
      (doseq [p-messages (partition number-of-messages-per-time messages)]
        (let [agents (map agent p-messages)]
          (doseq [a agents] (send-off a process))
          (apply await-for await-timeout agents)
          (map deref agents))))
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-06-02
      • 2022-01-04
      • 1970-01-01
      相关资源
      最近更新 更多