【问题标题】:Batch processor (aggregate items out of a queue)批处理器(从队列中聚合项目)
【发布时间】:2016-08-22 14:48:22
【问题描述】:

我有一个 System.Timers.Timer,它每 3 秒消逝一次。
一旦它过去了,我想取出我收藏中的所有项目并分批处理它们。

这样做的动机是减少后端系统上的 I/O 数量。

挑战在于我有多个并发线程附加到集合/队列中。因此,我考虑过使用ConcurrentQueue<T> - 但这是一个糟糕的选择。

这个article on social msdn很好地描述了这里的问题。

我需要一个集合/队列,我可以在其中一次获取所有数据(ToArray())并在一个原子操作中清除队列,这样我就不会丢失写入集合/队列的任何数据同时由其他线程。

 private static void T1_Elapsed(object sender, ElapsedEventArgs e)
 {
    string[] result = _queue.ToArray();
   _queue = new ConcurrentQueue<string>(); // strings will be lost :-)
 }

我倾向于在简单的Queue&lt;T&gt; 上使用简单的基于锁的方法。

 private static readonly object _myLock = new object();

 private static void T1_Elapsed(object sender, ElapsedEventArgs e)
 {
     string[] result;
     lock (_myLock)
     {
         result = _queue.ToArray();
         _queue.Clear();
     }
 }

现在这段代码有一个明显的缺陷,可以在生产者代码中看到:

private static void ProduceItems()
{
    //while (!_stop)
    for(int i=0; i<int.MaxValue; i++)
    {
        if (_stop) break;

        lock (_myLock) // bad. locks out other producers running on other threads.
        {
            Console.WriteLine("Enqueue " + i);
            _queue.Enqueue("string" + i);
        }

        Thread.Sleep(1000); // FOR DEBUGGING PURPOSES ONLY
    }
}

当然,这段代码会锁定任何其他试图追加到队列的生产者。如果“T1_Elapsed”锁已设置,我有什么方法可以验证生产者中的锁吗?

还有什么更适合我的问题吗?也许有什么可观察的?还是有什么好的“批处理器/聚合器”示例?

更新 1:RX
真棒你可以用 RX 做什么 :)
在这种情况下,我仍在研究如何处理错误、重试或重新入队。

internal class Rx
{
    internal static void Start()
    {
        ISubject<int> subject = new Subject<int>();
        ISubject<int> syncedSubject = Subject.Synchronize(subject); // that should do it? - UNTESTED!

        var subscription = syncedSubject.Buffer(TimeSpan.FromSeconds(5), 10)
            .Subscribe((item) => ProcessBatch(item));

        for (int i=1; i<int.MaxValue; i++)
        {
            syncedSubject.OnNext(i);
            Thread.Sleep(200);
            Console.WriteLine($"Produced {i}.");
        }

        Console.ReadKey();
        subscription.Dispose();
    }

    private static void ProcessBatch(IList<int> list)
    {
        // Aggregate many into one
        string joined = string.Join(" ", list);

        // Process one
        Console.WriteLine($"Wrote {joined} to remote storage.");

        // how do you account for errors here?
        myProducer.ReEnqueueMyFailedItems(list); // ?
    }
}

【问题讨论】:

    标签: c# multithreading collections locking


    【解决方案1】:

    TPL 数据流

    我想说试试 TPL DataFlow 库。它建立在任务并行库的基础上,专为并发性起着重要作用的这类需求而设计。有关此库的一系列博客文章,请参阅 http://blog.stephencleary.com/2012/09/introduction-to-dataflow-part-1.html

    BatchBlock 似乎很适合您的场景。教程见https://msdn.microsoft.com/en-us/library/hh228602(v=vs.110).aspx

    另一个使用BatchBlock的例子: https://taskmatics.com/blog/simplifying-producer-consumer-processing-with-tpl-dataflow-structures/

    您将发布到可用的 TPL 数据流块之一,而不是将数据发布到队列。

    另一个选项可能正在使用

    反应式扩展

    请参阅http://www.introtorx.com/uat/content/v1.0.10621.0/01_WhyRx.html 以获得良好的介绍

    它也提供批处理支持:

    void Sample()
    {
        var dataprovider = new Subject<int>();
    
        var subscription = dataprovider
            .Buffer(TimeSpan.FromMinutes(3))
            .Subscribe(listOfNumbers => 
            {
                // do something with batch of items
                var batchSize = listOfNumbers.Count;
            });
    
        for(int i = 0; i <= 5; ++i)
        {
            dataprovider.OnNext(i);
        }
    
        subscription.Dispose();
    }
    

    在上面的示例中,您需要进行一些修改以使来自不同线程的多个生产者添加数据,请参阅reactive extension OnNext。它是简化的代码(!),但它可以让您大致了解使用 RX 的概念。

    可以使用最大缓冲区大小、给定时间段或两者的组合来完成缓冲。所以它也可以代替你的计时器。

    您在Subject 上调用OnNext 而不是将项目添加到队列中

    TPL DataFlow 和 RX 都消除了使用队列或类似需要清除的东西,因此它会让您摆脱这种痛苦。

    【讨论】:

    • 我知道那里有一些东西 ;-) RX 扩展给我留下了深刻的印象。你的例子真的很有帮助。现在我只需要弄清楚任何重试/失败机制如何适应这个。
    • 您可以将它们发送到另一个 RX 流并在该特定订阅操作中处理它们。也有内置的错误处理策略,但这在一定程度上取决于您在发生故障时需要采取什么样的行动。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-10-05
    • 1970-01-01
    • 2012-11-12
    • 1970-01-01
    • 2015-05-05
    相关资源
    最近更新 更多