【发布时间】:2016-08-22 14:48:22
【问题描述】:
我有一个 System.Timers.Timer,它每 3 秒消逝一次。
一旦它过去了,我想取出我收藏中的所有项目并分批处理它们。
这样做的动机是减少后端系统上的 I/O 数量。
挑战在于我有多个并发线程附加到集合/队列中。因此,我考虑过使用ConcurrentQueue<T> - 但这是一个糟糕的选择。
这个article on social msdn很好地描述了这里的问题。
我需要一个集合/队列,我可以在其中一次获取所有数据(ToArray())并在一个原子操作中清除队列,这样我就不会丢失写入集合/队列的任何数据同时由其他线程。
private static void T1_Elapsed(object sender, ElapsedEventArgs e)
{
string[] result = _queue.ToArray();
_queue = new ConcurrentQueue<string>(); // strings will be lost :-)
}
我倾向于在简单的Queue<T> 上使用简单的基于锁的方法。
private static readonly object _myLock = new object();
private static void T1_Elapsed(object sender, ElapsedEventArgs e)
{
string[] result;
lock (_myLock)
{
result = _queue.ToArray();
_queue.Clear();
}
}
现在这段代码有一个明显的缺陷,可以在生产者代码中看到:
private static void ProduceItems()
{
//while (!_stop)
for(int i=0; i<int.MaxValue; i++)
{
if (_stop) break;
lock (_myLock) // bad. locks out other producers running on other threads.
{
Console.WriteLine("Enqueue " + i);
_queue.Enqueue("string" + i);
}
Thread.Sleep(1000); // FOR DEBUGGING PURPOSES ONLY
}
}
当然,这段代码会锁定任何其他试图追加到队列的生产者。如果“T1_Elapsed”锁已设置,我有什么方法可以验证生产者中的锁吗?
还有什么更适合我的问题吗?也许有什么可观察的?还是有什么好的“批处理器/聚合器”示例?
更新 1:RX
真棒你可以用 RX 做什么 :)
在这种情况下,我仍在研究如何处理错误、重试或重新入队。
internal class Rx
{
internal static void Start()
{
ISubject<int> subject = new Subject<int>();
ISubject<int> syncedSubject = Subject.Synchronize(subject); // that should do it? - UNTESTED!
var subscription = syncedSubject.Buffer(TimeSpan.FromSeconds(5), 10)
.Subscribe((item) => ProcessBatch(item));
for (int i=1; i<int.MaxValue; i++)
{
syncedSubject.OnNext(i);
Thread.Sleep(200);
Console.WriteLine($"Produced {i}.");
}
Console.ReadKey();
subscription.Dispose();
}
private static void ProcessBatch(IList<int> list)
{
// Aggregate many into one
string joined = string.Join(" ", list);
// Process one
Console.WriteLine($"Wrote {joined} to remote storage.");
// how do you account for errors here?
myProducer.ReEnqueueMyFailedItems(list); // ?
}
}
【问题讨论】:
标签: c# multithreading collections locking