【发布时间】:2016-09-20 11:09:36
【问题描述】:
我现在正在使用 TPL Dataflow,我需要实现自己的操作块。
这个动作块应该接受来自两个不同输入块的消息,将这些消息放入单个队列,然后按顺序处理这个队列。这里的重点是两个不同的任务不应该同时执行,我不想使用锁。
这是我的解决方案,但它不能正常工作。
public class OrderedActionBlock<TInputLhs, TInputRhs> : IDataflowBlock
where TInputLhs : class
where TInputRhs : class
{
public ITargetBlock<TInputLhs> InputLhs { get { return inputLhs; } }
public ITargetBlock<TInputRhs> InputRhs { get { return inputRhs; } }
private readonly BufferBlock<TInputLhs> inputLhs = new BufferBlock<TInputLhs>();
private readonly BufferBlock<TInputRhs> inputRhs = new BufferBlock<TInputRhs>();
private ITargetBlock<object> queue;
public OrderedActionBlock(Action<TInputLhs> actionLhs, Action<TInputRhs> actionRhs)
{
queue = new ActionBlock<object>(x =>
{
if (x is TInputLhs)
{
actionLhs(x as TInputLhs);
}
else
{
actionRhs(x as TInputRhs);
}
});
inputLhs.LinkTo(queue, new DataflowLinkOptions() { PropagateCompletion = true });
inputRhs.LinkTo(queue, new DataflowLinkOptions() { PropagateCompletion = true });
}
public void Complete()
{
queue.Complete();
}
public Task Completion
{
get { return queue.Completion; }
}
public void Fault(Exception exception)
{
queue.Fault(exception);
}
}
简单使用示例:
static void Main(string[] args)
{
var splitBlock = new SplitBlock<string>(new Predicate<string>(s => s.Length % 2 == 0));
var batchBlock = new BatchBlock<string>(3);
var processInOrderBlock = new OrderedActionBlock<string, string[]>(
new Action<string>((str) =>
{
Console.WriteLine(str);
}),
new Action<string[]>((batch) =>
{
Console.WriteLine("BATCH - " + string.Join(", ", batch));
}));
splitBlock.SourceFiltered.LinkTo(processInOrderBlock.InputLhs, new DataflowLinkOptions() { PropagateCompletion = true });
splitBlock.SourceNonFiltered.LinkTo(batchBlock, new DataflowLinkOptions() { PropagateCompletion = true });
batchBlock.LinkTo(processInOrderBlock.InputRhs, new DataflowLinkOptions() { PropagateCompletion = true });
for (int i = 1; i <= 10; i++)
{
splitBlock.Post(new string(Enumerable.Repeat('x', i).ToArray()));
}
splitBlock.Complete();
processInOrderBlock.Completion.Wait();
return;
}
输出:
xx
xxxx
xxxxxx
xxxxxxxx
xxxxxxxxxx
BATCH - x, xxx, xxxxx
Press any key to continue . . .
看起来消息卡在batchBlock 中。我不知道为什么。
【问题讨论】:
-
为什么你不能只使用一个并行限制为一个的常规 ActionBlock?你几乎有,只是缺少限制。
-
@usr,在这种情况下,我应该将检查消息类型(是单个消息还是批处理)的代码提取到用户代码中,而我想将此类基础架构代码保留在库中的某个位置.此外,我不喜欢接受“某些对象”(即 ActionBlock
-
你能说清楚为什么不能只设置 maxparallelism = 1 吗?这和批处理有什么关系?
-
@usr,ActionBlock 不能接受来自两个不同源块的不同消息(
Message来自一个,Message[]来自另一个)并对它们执行不同的操作。我这里需要自定义块 -
在您的
queue = new ActionBlock...行中,您正在这样做:创建一个接受两个不同类型的不同来源的 ActionBlock。现在为该块配置 maxparallelism = 1。
标签: c# tpl-dataflow