【发布时间】:2018-11-27 10:31:40
【问题描述】:
我有一个相当简单的生产者-消费者模式,其中(简化)我有两个生产者,他们生产的输出将由一个消费者消费。
为此,我使用System.Threading.Tasks.Dataflow.BufferBlock<T>
创建了一个BufferBlock 对象。一个Consumer 正在监听这个BufferBlock,并处理任何接收到的输入。
同时有两个 'Producerssend data to theBufferBlock`
简化:
BufferBlock<int> bufferBlock = new BufferBlock<int>();
async Task Consume()
{
while(await bufferBlock.OutputAvailable())
{
int dataToProcess = await outputAvailable.ReceiveAsync();
Process(dataToProcess);
}
}
async Task Produce1()
{
IEnumerable<int> numbersToProcess = ...;
foreach (int numberToProcess in numbersToProcess)
{
await bufferBlock.SendAsync(numberToProcess);
// ignore result for this example
}
}
async Task Produce2()
{
IEnumerable<int> numbersToProcess = ...;
foreach (int numberToProcess in numbersToProcess)
{
await bufferBlock.SendAsync(numberToProcess);
// ignore result for this example
}
}
我想先启动消费者,然后将生产者作为单独的任务启动:
var taskConsumer = Consume(); // do not await yet
var taskProduce1 = Task.Run( () => Produce1());
var taskProduce2 = Task.Run( () => Produce2());
// await until both producers are finished:
await Task.WhenAll(new Task[] {taskProduce1, taskProduce2});
bufferBlock.Complete(); // signal that no more data is expected in bufferBlock
// await for the Consumer to finish:
await taskConsumer;
乍一看,这正是生产者-消费者的含义:多个生产者生产数据,而消费者正在消费产生的数据。
然而,BufferBlock about thread safety 说:
不保证任何实例成员都是线程安全的。
我认为 TPL 中的 P 表示并行! 我应该担心吗?我的代码不是线程安全的吗? 我应该使用不同的 TPL 数据流类吗?
【问题讨论】:
-
MSDN 文章中的“线程安全”部分是出了名的不可靠。只是从文章模板中复制/粘贴,作者往往无法从开发人员那里获得足够的信息以使其更准确。请记住,仅仅因为 DataBlock 是线程安全的,并不会自动使您自己的代码也成为线程安全的。当另一个线程忙于添加项目时,像 Count 属性这样的东西是毫无价值的。希望显而易见,但未在文档中明确说明。
-
为什么是
BufferBlock而不是ActionBlock?async Task Consume()可以只替换为Process(...)调用,例如ActionBLock<int>(x => Process(x))
标签: c# task-parallel-library producer-consumer dataflow tpl-dataflow