【问题标题】:How to implement a parallel logical or with early termination in Clojure如何在 Clojure 中实现并行逻辑或提前终止
【发布时间】:2019-09-19 08:48:27
【问题描述】:

我想定义一个谓词,将一些谓词作为输入 具有相应的输入(它们可以作为惰性调用序列给出), 并行运行它们并计算结果的逻辑或, 这样,当谓词调用终止返回true时, 整个计算也终止(返回true)。

除了提供时间优化,这也有助于避免 在某些情况下不终止(某些谓词调用可能不会终止)。 实际上,将不终止解释为第三个undefined 值, 此谓词模拟 Kleene's K3 logic 中的 or 操作 (初始居中Kleene algebra中的join)。

here 为 Haskell 家族提供了类似的东西。 在 Clojure 中是否有任何(最好是简单的)方法可以做到这一点?

编辑:我决定在阅读 cmets 后添加一些说明。

(a) 首先,线程池耗尽后发生的事情并不重要。我认为创建一个足够大的线程池来满足我们的需求是一个合理的约定。

(b) 最关键的要求是谓词调用开始并行运行,一旦谓词调用终止返回true,所有其他运行的线程都会中断。预期的行为是:

  • 如果有谓词调用返回true:并行或返回true
  • else 如果存在未终止的谓词调用:并行或未终止
  • else:并行或返回false

换句话说,它的行为类似于falseundefinedtrue 给出的三元素格中的连接,undefined 表示非终止。

(c) 并行的 or 应该能够将许多谓词和许多谓词输入(每个都对应一个谓词)作为输入。但如果将惰性序列作为输入会更好。然后,命名并行或pany(用于“并行任何”),我们可以进行如下调用:

  • (pany (map (comp eval list) predicates inputs))
  • (pany (map (comp eval list) predicates (repeat input)))
  • (pany (map (comp eval list) (repeat predicate) inputs)) 相当于 (pany (map predicate (unchunk inputs)))

作为最后的评论,我认为要求 pany、双重 pall 或用于构建此类早期终止并行缩减以易于实施甚至内置的机制是很自然的Clojure 等面向并行的语言。

【问题讨论】:

  • 经过几次尝试,我最终得到了this。它将在可调整大小的 Java 线程池中运行谓词检查,并在找到结果后立即跳过进一步的检查。问题是,我没有发布答案的原因是,如果没有找到真正的结果,这段代码将无限期挂起,因为承诺永远不会被交付,所以取消承诺将永远阻塞。我找不到解决方案,但我想我会发布我所拥有的,以防它可以构建。
  • 我想问题是如果前 n 个结果未能终止,其中 n 是您的线程池的大小,那么我们将永远不会返回。认为它可能需要可中断的操作。
  • 您希望在找到结果后取消当前正在运行的检查,还是只想阻止新的检查开始?如果是后者,请参阅我的new version of check-in-par。我从promise 获得了一些灵感,修复了之前的问题。不幸的是,它需要知道谓词列表的长度。我不明白你怎么能避免这种情况。您必须以某种方式知道检查何时完成才能报告失败。
  • @Carcigenicate 我认为任何解决方案都将涉及做一些道德上等同于计算输入的事情。更让我担心的是:如果有 N 个(这里是 4 个)或更多任务永远无法完成怎么办?您的线程池充满了卡住的线程。在我看来,Fork/Join 和 CompletableFuture 在这里可能会有所帮助,但我不确定:他们可能不喜欢工作线程只是爬进一个洞。
  • @amalloy 你可以从execute 切换到使用submit,如果你有一个合理的执行时间上限(我认为返回的Futures 允许这样做),那么任务就会超时。你也可以从一个固定的线程池切换到一个像newCachedThreadPool返回的更动态的线程池。如果任务有可能永远无法完成,那么添加超时是我能看到的唯一选择,因为扩展池只会允许更多无限的作业一次坐在池中。

标签: clojure parallel-processing functional-programming logic terminate


【解决方案1】:

我将根据归约函数来定义我们的谓词。实际上,我们可以重新实现所有 Clojure 迭代函数来支持这种并行操作,但我将仅使用 reduce 作为示例。

我将定义一个计算函数。我会用同一个,但没有什么能阻止你拥有很多。如果累积 1000,则该函数为“真”。

(defn computor [acc val]
        (let [new (+' acc val)] (if (> new 1000) (reduced new) new)))

(reduce computor 0 (range))
;; =>
1035

(reduce computor 0 (range Long/MIN_VALUE 0))
;; =>
;; ...this is a proxy for a non-returning computation

;; wrap these up in a form suitable for application of reduction
(def predicates [[computor 0 (range)] 
                 [computor 0 (range Long/MIN_VALUE 0)]])

现在让我们开始讨论这个问题。我想在每次计算中迈出一步,如果其中一个计算完成,我想返回它。实际上,一次一步使用 pmap 非常慢 - 工作单元太小,不值得线程化。在这里,在继续之前,我已经更改了每个工作单元的 1000 次迭代。您可能会根据您的工作量和步骤成本来调整它。

(defn p-or-reducer* [reductions]
        (let [splits (map #(split-at 1000 %) reductions) ;; do at least 1000 iterations per cycle
              complete (some #(if (empty? (second %)) (last (first %))) splits)]
          (or complete (recur (map second splits)))))

然后我将其包装在驱动程序中。

(defn p-or [s]
  (p-or-reducer* (map #(apply reductions %) s)))

(p-or predicates)
;; =>
1035

在哪里插入 CPU 并行度? p-or-reducer* 中的 s/map/pmap/ 应该这样做。我建议只并行化第一个操作,因为这将驱动归约序列进行计算。

(defn p-or-reducer* [reductions]
        (let [splits (pmap #(split-at 1000 %) reductions) ;; do at least 1000 iterations per cycle
              complete (some #(if (empty? (second %)) (last (first %))) splits)]
          (or complete (recur (map second splits)))))

(def parallelism-tester (conj (vec (repeat 40000 [computor 0 (range Long/MIN_VALUE 0)]))
                             [computor 0 (range)]))

(p-or parallelism-tester) ;; terminates even though the first 40K predicates will not

很难定义一个高性能的通用版本。在不知道每次迭代的成本的情况下,很难得出有效的并行策略——如果一次迭代需要 10 秒,那么我们可能一次只走一步。如果需要 100ns,那么我们需要一次采取许多步骤。

【讨论】:

    【解决方案2】:

    您是否会考虑采用core.async 处理与async/goasync/thread 的并行任务,并与async/alts! 提前返回?

    例如,将核心or 函数从串行变为并行。我们可以创建一个宏(我称之为por)将输入函数(或谓词)包装到async/thread 中,然后在它们之上执行套接字选择async/alts!

    (defmacro por [& fns]
      `(let [[v# c#] (async/alts!!
                      [~@(for [f fns]
                           (list `async/thread f))])]
         v#))
    
    (time
     (por (do (println "running a") (Thread/sleep 30) :a)
          (do (println "running b") (Thread/sleep 20) :b)
          (do (println "running c") (Thread/sleep 10) :c)))
    ;; running a
    ;; running b
    ;; running c
    ;; "Elapsed time: 11.919169 msecs"
    ;; => :c
    

    与原来的or(串行运行)相比:

    (time
     (or (do (println "running a") (Thread/sleep 30) :a)
         (do (println "running b") (Thread/sleep 20) :b)
         (do (println "running c") (Thread/sleep 10) :c)))
    ;; running a
    ;; => :a
    ;; "Elapsed time: 31.642506 msecs"
    

    【讨论】:

    • 如果n个谓词没有停止,n高于core.async线程池会发生什么?
    • 解决这个问题的一种方法是使用一个函数启动并行 or 子句,该函数在超时后返回默认答案,然后我们可以确定调用者总是会得到答案。处理失控/长时间运行的任务需要对任务本身有一定的了解(即如何取消任务)。对于从go 通道获取输入的go 块任务,我将简单地使用另一个通道作为控制信号提前终止(使用alts!)。
    • 即使调用者得到答案,工作线程仍然会海狸!
    • 如果你将函数分解成一个 go 块,迭代器可以在其中接收“step”消息或“terminate”消息,那么我想你就在那里。关键是将循环控制外部化,以便终止在协调者手中。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-03-07
    相关资源
    最近更新 更多