【发布时间】:2011-02-05 20:44:02
【问题描述】:
我正在尝试找出使用代理来使用消息队列 (Amazon SQS) 中的项目的最佳方式。现在我有一个函数(process-queue-item),它从队列中抓取一个项目,并处理它。
我想同时处理这些项目,但我不知道如何控制代理。基本上我想让所有代理尽可能忙,而不是从队列中拉出许多项目并产生积压(我将在几台机器上运行这个,所以项目需要留在队列中,直到他们真的需要)。
谁能给我一些改进我的实现的建议?
(def active-agents (ref 0))
(defn process-queue-item [_]
(dosync (alter active-agents inc))
;retrieve item from Message Queue (Amazon SQS) and process
(dosync (alter active-agents dec)))
(defn -main []
(def agents (for [x (range 20)] (agent x)))
(loop [loop-count 0]
(if (< @active-agents 20)
(doseq [agent agents]
(if (agent-errors agent)
(clear-agent-errors agent))
;should skip this agent until later if it is still busy processing (not sure how)
(send-off agent process-queue-item)))
;(apply await-for (* 10 1000) agents)
(Thread/sleep 10000)
(logging/info (str "ACTIVE AGENTS " @active-agents))
(if (> 10 loop-count)
(do (logging/info (str "done, let's cleanup " count))
(doseq [agent agents]
(if (agent-errors agent)
(clear-agent-errors agent)))
(apply await agents)
(shutdown-agents))
(recur (inc count)))))
【问题讨论】:
-
有什么方法可以将消息队列视为 seq,然后只使用 pmap 来获得并行化?
-
@Alex Stoddard:在我的情况下,process-queue-item 实际上阻塞了网络 IO,所以我认为 pmap 不是正确的选择,因为它只使用与机器内核一样多的线程.
-
@erikw:当然,但这只是一个 pmap 实现细节(线程 = #cores + 2)。没有理由您不能编写具有参数化线程数的 pmap 版本。看pmap源码的第一行:(let [n (+ 2 (..Runtime getRuntime availableProcessors))
-
您好,我有几个问题: 1. 代理有一个价值,您对他们的价值感兴趣还是只是将其用作线程池? 2.队列消耗有最终结果还是process-queue-item有副作用?
-
@cgrand: 1) 我对代理的价值不感兴趣,只是将它们用作线程池。 2) process-queue-item 具有 seid-effects(将结果推回消息队列)。
标签: concurrency clojure queue