【问题标题】:Clojure multithreading output streamsClojure 多线程输出流
【发布时间】:2013-10-18 18:20:08
【问题描述】:

我是 Clojure 的新手。我有多个线程试图写入输出流,如果我没记错套接字并且它们的流不是线程安全的,这意味着如果我同时写入它们,位可能会混淆。 clojure 的主要好处之一是对竞争条件的内置并发处理。我如何在我的场景中使用它?

我尝试研究原子、引用等。我最初认为将输出流声明为原子会起作用,但我不太确定,因为它似乎避免同时更改原子状态(使用交换!)但是我认为你可以从多个线程中取消引用原子,这意味着多个线程将deref 持有我的输出流的原子并同时写入它。

任何建议都会很有帮助。

提前致谢

(defn send-my-data [output data-bytes]
  (try 
    (.write output)
    (.flush output)
    (catch Exception excp
       (println (format "issue %s" (.printStackTrace excp))))

现在我所有的线程在想将数据写入输出流时都会调用这个函数

【问题讨论】:

    标签: multithreading sockets concurrency clojure outputstream


    【解决方案1】:

    代理通常被认为是完成此类任务的正确工具。他们采用一系列任务在其内部状态上运行,并按照接收到的顺序运行它们。它们还可以很好地与 Clojure 的 STM 的其余部分配合使用。例如在事务中发送到代理表单的消息只发送一次,并且仅在事务提交时发送。

    user> (let [output-agent (agent "")] 
            (dotimes [x 10] 
              (send output-agent (fn [_] (println "hello" x)))))
    nil
    hello 0
    hello 1
    hello 2
    hello 3
    hello 4
    hello 5
    hello 6
    hello 7
    hello 8
    hello 9
    

    在此示例中,要执行的操作是一个匿名函数,它忽略输入并仅打印一些内容。

    【讨论】:

    • 根据我提供的示例代码,我该如何实现您的方法。我想我很难掌握代理数据类型的概念,因为我试图将它与我使用过的其他基本数据类型进行比较。我不完全确定在上面的例子中实际的输出代理做了什么。对不起,如果它是一个新手问题。我真的很想了解 clojure 并发的强大功能。
    • 在这个例子中,输出代理的内容完全没有被使用,输出代理被用作一个任务队列,它可以很好地处理事务,并将收集任何异常以供以后检查。每次调用 send 都会将要运行的函数放到代理输入队列中,然后代理线程池按照它们在线程上从特殊代理线程池接收到的顺序运行输入队列中的函数。如果您要发送长时间运行的作业或需要限制并行任务,则值得了解 send-off 和 send-via 功能。
    【解决方案2】:

    如果您需要确保没有其他线程正在使用某个对象(并且希望在您的代码中等待并且在该特定线程中不做任何事情直到该对象被解锁以便您可以锁定),您可以使用locking它)。

    user> (dotimes [i 10] (future (println \h \e \l \l \o)))
    hhh h e
    nil
     eh  le  l lo 
    h e lh he  e ll h  el  ll  lo 
    e  e l l oh
    l l oo
    l  ol
    
    l lo o
    
     e
    
    o
     l l o
    
    user> (dotimes [i 10] (future (locking *out* (println \h \e \l \l \o))))
    h
    nil
     e l l o
    h e l l o
    h e l l o
    h e l l o
    h e l l o
    h e l l o
    h e l l o
    h e l l o
    h e l l o
    h e l l o
    
    user> 
    

    【讨论】:

      【解决方案3】:

      我最终实现了代理方法,因为我认为它更符合习惯并且代理的其他一些好处。

      (let [wtr (agent (.getOutputStream mysocket) "agent.log")]
          (defn log [msg]
            (letfn [(write [out msg]
                   (.write out msg)
                   (.flush out)
                      out)]
            (send-off wtr write msg)))
        (defn close []
              (send wtr #(.close %))))
      

      改编自here

      记住在代理接受返回值时返回输出流。一个常见的错误。

      谢谢 亚瑟·乌尔费尔特

      【讨论】:

        猜你喜欢
        • 2015-10-26
        • 1970-01-01
        • 1970-01-01
        • 2013-12-17
        • 1970-01-01
        • 2011-12-23
        • 2020-01-21
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多