【问题标题】:C# ConcurrentDictionary vs. ManualResetEvent for Thread Control用于线程控制的 C# ConcurrentDictionary 与 ManualResetEvent
【发布时间】:2014-10-30 04:11:32
【问题描述】:

我有一个 Windows 服务,它使用带有回调的 System.Threading.Timer 来更新端点,如下所示:

UpdateEndpointTimer = new Timer(
                new TimerCallback(UpdateSBEndpoints),
                Endpoint.Statuses.Online,
                EndpointUpdateFrequency,
                EndpointUpdateFrequency);

我的更新方法大致如下:

private void UpdateSBEndpoints(object state)
{
    ...
    using (var context = new TestHarnessContext())
    {
        var endpoints = context.Endpoints.Where(p =>
            p.Binding == Endpoint.Bindings.ServiceBus
            && p.State == Endpoint.States.Enabled
            && p.Status != status).ToList();

         foreach (var endpoint in endpoints)
         {
             //Do stuff here
         }
         ...
}

现在,由于计时器使用 ThreadPool 中的线程来触发回调,我需要采取措施来控制线程。当多个线程可以在第一个线程完成工作之前从数据库中获取相同的端点时,就会出现一个特定的问题,这会导致在 foreach 循环中完成重复的工作。

我知道该问题的两种可能的解决方案,我想知道哪一种更好,更可取。解决方案是ConcurrentDictionaryManualResetEvent

在第一种情况下,我会将它放在我的 foreach 循环中,以确保一次只有一个线程可以在给定端点上运行:

if (EndpointsInAction.TryAdd(endpoint.Id, endpoint.Id) == false)
    // If we get here, another thread has started work with this endpoint.
    return;
...
//Do stuff with the endpoint, once done, remove its Id from the dictionary
...
int id;
EndpointsInAction.TryRemove(endpoint.Id, out id);

在第二种情况下,我会像这样控制线程:

protected ManualResetEvent PubIsBeingCreated { get; set; }
protected ManualResetEvent SubIsBeingCreated { get; set; }
...
this.PubIsBeingCreated = new ManualResetEvent(true);
this.SubIsBeingCreated = new ManualResetEvent(true);
...
foreach (var endpoint in endpoints)
{
   if (!this.PubIsBeingCreated.WaitOne(0))
// If we get here, another thread has started work with this endpoint.
        return;

   try
   {
       // block other threads (Timer Events)
       PubIsBeingCreated.Reset();
       // Do stuff
   }
   ...
   finally
   {
       // Restore access for other threads
       PubIsBeingCreated.Set();
   }
}

现在这两种方法似乎都有效我想知道哪种方法更可取(更有效?)。我倾向于使用ConcurrentDictionary,因为它允许对线程进行更精细的过滤,即不允许两个线程与特定端点一起工作,而不允许两个线程与特定端点type一起工作(pubs和 ManualResetEvents 中的子项)。可能有另一种解决方案优于我的解决方案,因此任何信息将不胜感激。

【问题讨论】:

  • 只需将计时器的 period 参数设置为 0。在方法结束时调用 Change() 以重新启动它。现在它永远不会重叠。
  • 如果我正确理解您的问题,计时器会在第一次回调完成之前第二次触发。您可以使用我在此问题的答案中描述的技术来防止这种情况发生:stackoverflow.com/questions/17996147/…

标签: c# multithreading


【解决方案1】:

在任何情况下,您都不应为此目的使用 ManualResetEvent。使用提供更高级别抽象的对象总是更好,但即使您想在低级别控制它,使用 .NET 的 Monitor 类(通过 lock 语句,以及 Monitor.Wait()Monitor.Pulse() 方法也会比使用ManualResetEvent 更好。

您似乎有一个相当标准的生产者/消费者场景。在这种情况下,在我看来,与ConcurrentDictionary 相比,使用ConcurrentQueue 会更好。您的生产者会将事物排入队列,而您的消费线程会将它们出列以进行处理。

【讨论】:

  • 我应该提供有关更新方法的更多详细信息。基本上,它会定期在数据库中搜索未处理的端点,并在找到它们时根据端点类型创建服务总线发布者和订阅者。我想我明白你的意思,但现在我想知道如何处理循环更新。如果我要放弃计时器(它本质上是多线程的,无法控制提供的线程),保留自动更新功能的最佳方法是什么?无论如何,非常感谢您不鼓励使用 ManualResetEvent。
  • 我认为没有必要放弃使用计时器。从原始帖子中我不清楚为什么要同时处理更新。但是假设 Hans 推断这是因为计时器的运行速度比处理单个计时器事件的速度快,那么他的建议应该有效。或者,您可以延长时间间隔(同时仍在同步,以防您确实获得事件的并发处理)。
【解决方案2】:

我会做不同的事;我会使用 ConcurrentBag 并让一个线程从数据库加载它,多个线程通过 TryTake 监听它。加载包的单线程将解决您的问题,他们可能会从数据库中获取相同的项目。下面将在控制台应用程序中进行说明:

class Program
{
    private static ConcurrentBag<int> _bag;

    public static void Main()
    {
        // Construct and populate the ConcurrentBag
        _bag = new ConcurrentBag<int>();
        var bagPopulateTask = new Task(PopulateBag);
        bagPopulateTask.Start();

        var bagEmptyTask1 = new Task(EmptyBag);
        var bagEmptyTask2 = new Task(EmptyBag);
        var bagEmptyTask3 = new Task(EmptyBag);

        bagEmptyTask1.Start();
        bagEmptyTask2.Start();
        bagEmptyTask3.Start();

        Console.ReadKey();
    }

    private static void EmptyBag()
    {
        int i;
        while (true)
        {
            if (_bag.TryTake(out i))
            {
                Console.WriteLine("Thread {0} took {1}", Thread.CurrentThread.ManagedThreadId, i);
            }
            else
            {
                Console.WriteLine("Thread {0} took nothing", Thread.CurrentThread.ManagedThreadId);
            }
            Thread.Sleep(50);
        }
    }

    private static void PopulateBag()
    {
        var i = 0;

        while (i < 1000)
        {
            Console.WriteLine("Thread {0} added {1}", Thread.CurrentThread.ManagedThreadId, i);
            _bag.Add(i);
            i++;
            Thread.Sleep(i);
        }
    }
}

请注意,如果顺序很重要,您可以使用 ConcurrentQueue 或 ConcurrentStack 进行类似操作。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2017-10-27
    • 1970-01-01
    • 1970-01-01
    • 2010-11-08
    • 2018-05-17
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多