【问题标题】:Managing states across multiple threads跨多个线程管理状态
【发布时间】:2018-05-05 15:25:49
【问题描述】:

我有一个带有一堆序列的 F# 类。该类包含一个简单的next() 方法,它返回当前序列中的下一个元素。如果当前序列的所有元素都已返回,则它将转移到下一个序列。该类包含一个指针,该指针是序列中的下一个元素以及它从哪个序列返回。

我目前仅限于公开next() 方法。

一些上游类将在不同线程之间使用我的类(同一个对象实例)。这将使观点不同步,因为多个线程都应该从头开始。我知道这并不理想,但这是我目前必须解决的问题。

例子:

Thread 1 next(): return elem. A Thread 1 next(): return elem. B Thread 2 next(): return elem. A Thread 1 next(): return elem. C Thread 2 next(): return elem. B

有没有办法跟踪每个线程的指针?

我一直在考虑使用Threading.Thread.CurrentThread.ManagedThreadId 作为 Map 中的键,然后返回指针(并相应地在那里更新它)。我有点担心这个 Map 的线程安全性以及两个线程是否同时更新它们的状态。

我希望 somone 可以为我提供一些关于如何让它发挥作用的想法。

【问题讨论】:

    标签: .net multithreading f# thread-safety


    【解决方案1】:

    这可以通过使用MailboxProcessor 来管理状态,然后使用一个类从消费者那里抽象出MailboxProcessor 来实现。如果您跨多个线程共享一个实例,它们将以线程安全的方式看到彼此的更新。如果您为每个线程使用专用实例,他们将只能看到自己的更新。代码应该是这样的:

    // Add whatever other commands you need
    type private SequenceMessage = Next of AsyncReplyChannel<int>
    
    type IntSequence() =
        let agent = MailboxProcessor<SequenceMessage>.Start
                    <| fun inbox ->
                        let rec loop state =
                            async {
                                let! message = inbox.Receive()
                                // Add other matches as requried
                                match message with
                                | Next channel -> 
                                    let newState = state + 1
                                    channel.Reply(newState)
                                    return! loop newState
                            }
                        loop 0
    
        let next () =
            agent.PostAndReply <| fun reply -> Next reply
    
        let asyncNext () =
            agent.PostAndAsyncReply <| fun reply -> Next reply
    
        member __.Next () = next ()
        member __.AsyncNext () = asyncNext ()
    

    然后,要以每个线程都能看到其他线程的更新的方式使用它,您可以执行与此等效的操作:

    // To share state across multiple threads, use the same instance
    let sequence = IntSequence()
    [1..10]
    |> List.map (fun _ -> sequence.AsyncNext())
    |> Async.Parallel
    |> Async.RunSynchronously
    |> Array.iter (fun i -> printfn "%d" i)
    

    哪些打印:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    

    要以每个线程只看到自己的更新的方式使用它,您只需将前面的示例更改为如下所示:

    // To use a dedicate state for each thread, create a new instance
    [1..10]
    |> List.map (fun _ -> IntSequence())
    |> List.map (fun sequence -> sequence.AsyncNext())
    |> Async.Parallel
    |> Async.RunSynchronously
    |> Array.iter (fun i -> printfn "%d" i)
    

    哪些打印:

    1
    1
    1
    1
    1
    1
    1
    1
    1
    1
    

    【讨论】:

      猜你喜欢
      • 2018-12-19
      • 2019-10-08
      • 2013-06-12
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-12-17
      相关资源
      最近更新 更多