就迭代一个太大而无法放入内存的非常大的数据集而言,您可以使用生产者-消费者模型。我在处理包含数十亿条记录的自定义数据集时使用了类似的东西——总共大约 2 TB 的数据。
这个想法是有一个包含生产者和消费者的类。当您创建该类的新实例时,它会启动一个填充受约束并发队列的生产者线程。并且该线程使队列保持满。消费者部分是让您获取下一条记录的 API。
您从共享并发队列开始。为此,我喜欢 .NET BlockingCollection。
这是一个读取文本文件并维护 10,000 个文本行的队列的示例。
public class TextFileLineBuffer
{
private const int QueueSize = 10000;
private BlockingCollection<string> _buffer = new BlockingCollection<string>(QueueSize);
private CancellationTokenSource _cancelToken;
private StreamReader reader;
public TextFileLineBuffer(string filename)
{
// File is opened here so that any exception is thrown on the calling thread.
_reader = new StreamReader(filename);
_cancelToken = new CancellationTokenSource();
// start task that reads the file
Task.Factory.StartNew(ProcessFile, TaskCreationOptions.LongRunning);
}
public string GetNextLine()
{
if (_buffer.IsCompleted)
{
// The buffer is empty because the file has been read
// and all lines returned.
// You can either call this an error and throw an exception,
// or you can return null.
return null;
}
// If there is a record in the buffer, it is returned immediately.
// Otherwise, Take does a non-busy wait.
// You might want to catch the OperationCancelledException here and return null
// rather than letting the exception escape.
return _buffer.Take(_cancelToken.Token);
}
private void ProcessFile()
{
while (!_reader.EndOfStream && !_cancelToken.Token.IsCancellationRequested)
{
var line = _reader.ReadLine();
try
{
// This will block if the buffer already contains QueueSize records.
// As soon as a space becomes available, this will add the record
// to the buffer.
_buffer.Add(line, _cancelToken.Token);
}
catch (OperationCancelledException)
{
;
}
}
_buffer.CompleteAdding();
}
public void Cancel()
{
_cancelToken.Cancel();
}
}
这就是它的基本原理。您需要添加一个 Dispose 方法,以确保线程终止并关闭文件。
我已经在许多不同的程序中使用了这种基本方法,效果很好。您必须进行一些分析和测试以确定您的应用程序的最佳缓冲区大小。您需要足够大的东西来跟上正常的数据流并处理突发的活动,但又不能大到超出您的内存预算。
IEnumerable 修改
如果你想支持IEnumerable<T>,你必须做一些小的修改。我将扩展我的示例以支持IEnumerable<String>。
首先,您必须更改类声明:
public class TextFileLineBuffer: IEnumerable<string>
那么,你必须实现GetEnumerator:
public IEnumerator<String> GetEnumerator()
{
foreach (var s in _buffer.GetConsumingEnumerable())
{
yield return s;
}
}
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
这样,您可以初始化事物,然后将其传递给任何需要IEnumerable<string> 的代码。于是就变成了:
var items = new TextFileLineBuffer(filename);
DoSomething(items);
void DoSomething(IEnumerable<string> list)
{
foreach (var s in list)
Console.WriteLine(s);
}