【问题标题】:TPL Dataflow: process messages from two incoming blocks sequentiallyTPL 数据流:按顺序处理来自两个传入块的消息
【发布时间】:2016-09-20 11:09:36
【问题描述】:

我现在正在使用 TPL Dataflow,我需要实现自己的操作块。

这个动作块应该接受来自两个不同输入块的消息,将这些消息放入单个队列,然后按顺序处理这个队列。这里的重点是两个不同的任务不应该同时执行,我不想使用锁。

这是我的解决方案,但它不能正常工作。

public class OrderedActionBlock<TInputLhs, TInputRhs> : IDataflowBlock
    where TInputLhs : class
    where TInputRhs : class
{
    public ITargetBlock<TInputLhs> InputLhs { get { return inputLhs; } }
    public ITargetBlock<TInputRhs> InputRhs { get { return inputRhs; } }


    private readonly BufferBlock<TInputLhs> inputLhs = new BufferBlock<TInputLhs>();
    private readonly BufferBlock<TInputRhs> inputRhs = new BufferBlock<TInputRhs>();

    private ITargetBlock<object> queue;

    public OrderedActionBlock(Action<TInputLhs> actionLhs, Action<TInputRhs> actionRhs)
    {
        queue = new ActionBlock<object>(x =>
        {
            if (x is TInputLhs)
            {
                actionLhs(x as TInputLhs);
            }
            else
            {
                actionRhs(x as TInputRhs);
            }
        });

        inputLhs.LinkTo(queue, new DataflowLinkOptions() { PropagateCompletion = true });
        inputRhs.LinkTo(queue, new DataflowLinkOptions() { PropagateCompletion = true });
    }

    public void Complete()
    {
        queue.Complete();
    }

    public Task Completion
    {
        get { return queue.Completion; }
    }

    public void Fault(Exception exception)
    {
        queue.Fault(exception);
    }
}

简单使用示例:

static void Main(string[] args)
{
    var splitBlock = new SplitBlock<string>(new Predicate<string>(s => s.Length % 2 == 0));

    var batchBlock = new BatchBlock<string>(3);

    var processInOrderBlock = new OrderedActionBlock<string, string[]>(
        new Action<string>((str) =>
        {
            Console.WriteLine(str);
        }),
        new Action<string[]>((batch) =>
        {
            Console.WriteLine("BATCH - " + string.Join(", ", batch));
        }));

    splitBlock.SourceFiltered.LinkTo(processInOrderBlock.InputLhs, new DataflowLinkOptions() { PropagateCompletion = true });
    splitBlock.SourceNonFiltered.LinkTo(batchBlock, new DataflowLinkOptions() { PropagateCompletion = true });
    batchBlock.LinkTo(processInOrderBlock.InputRhs, new DataflowLinkOptions() { PropagateCompletion = true });

    for (int i = 1; i <= 10; i++)
    {
        splitBlock.Post(new string(Enumerable.Repeat('x', i).ToArray()));
    }

    splitBlock.Complete();

    processInOrderBlock.Completion.Wait();

    return;
}

输出:

xx
xxxx
xxxxxx
xxxxxxxx
xxxxxxxxxx
BATCH - x, xxx, xxxxx
Press any key to continue . . .

看起来消息卡在batchBlock 中。我不知道为什么。

【问题讨论】:

  • 为什么你不能只使用一个并行限制为一个的常规 ActionBlock?你几乎有,只是缺少限制。
  • @usr,在这种情况下,我应该将检查消息类型(是单个消息还是批处理)的代码提取到用户代码中,而我想将此类基础架构代码保留在库中的某个位置.此外,我不喜欢接受“某些对象”(即 ActionBlock)并更喜欢静态类型的 ActionBlock
  • 你能说清楚为什么不能只设置 maxparallelism = 1 吗?这和批处理有什么关系?
  • @usr,ActionBlock 不能接受来自两个不同源块的不同消息(Message 来自一个,Message[] 来自另一个)并对它们执行不同的操作。我这里需要自定义块
  • 在您的queue = new ActionBlock... 行中,您正在这样做:创建一个接受两个不同类型的不同来源的 ActionBlock。现在为该块配置 maxparallelism = 1。

标签: c# tpl-dataflow


【解决方案1】:

inputLhsinputRhs 中的任何 个已完成时,queue 似乎已完成(如果在链接期间使用PropagateCompletion = true 选项)。

所以,我们需要改变这个:

inputLhs.LinkTo(queue, new DataflowLinkOptions() { PropagateCompletion = true });
inputRhs.LinkTo(queue, new DataflowLinkOptions() { PropagateCompletion = true });

到这里:

Task.WhenAll(InputLhs.Completion, InputRhs.Completion)
    .ContinueWith(_ => queue.Complete());

【讨论】:

    【解决方案2】:

    您可以有一个ActionBlock,它接受带有两个值的ValueTuple,加上一个指示两个值中哪一个是有效值的索引:

    var block = new ActionBlock<(int, Type1, Type2)>(entry =>
    {
        var (index, item1, item2) = entry;
        switch (index)
        {
            case 1: DoSomething1(item1); break;
            case 2: DoSomething2(item2); break;
            default: throw new NotImplementedException();
        }
    });
    
    block.Post((1, someValue1, default));
    block.Post((2, default, someValue2));
    

    这样,去掉两个中介BufferBlocks,就可以确保处理顺序与发帖顺序完全一致。

    为了让它更漂亮、更不容易出错,你可以创建一个类似于 OrderedActionBlock 的类,但使用“假”ITargetBlock&lt;TInputLhs&gt;ITargetBlock&lt;TInputRhs&gt; 属性不是真正的块,而只是单个 @ 的传播器外观987654329@。从一个ITargetBlock 转换为另一个有点棘手,但它是可行的。下面是一个通用的实现。 ActionBlock&lt;TInput1, TInput2&gt; 在其 Target1Target2 都完成时完成,因此从链接源传播完成应该按预期工作。

    public class ActionBlock<TInput1, TInput2> : IDataflowBlock
    {
        private readonly ITargetBlock<(int, TInput1, TInput2)> _actionBlock;
    
        public Task Completion => _actionBlock.Completion;
        public void Complete() => _actionBlock.Complete();
        void IDataflowBlock.Fault(Exception ex) => _actionBlock.Fault(ex);
    
        public ITargetBlock<TInput1> Target1 { get; }
        public ITargetBlock<TInput2> Target2 { get; }
    
        public ActionBlock(Func<TInput1, Task> action1, Func<TInput2, Task> action2,
            ExecutionDataflowBlockOptions options = null)
        {
            if (action1 == null) throw new ArgumentNullException(nameof(action1));
            if (action2 == null) throw new ArgumentNullException(nameof(action2));
            options = options ?? new ExecutionDataflowBlockOptions();
    
            _actionBlock = new ActionBlock<(int, TInput1, TInput2)>(entry =>
            {
                var (index, item1, item2) = entry;
                return index switch // switch expression (C# 8.0 syntax)
                {
                    1 => action1(item1),
                    2 => action2(item2),
                    _ => throw new NotImplementedException()
                };
            }, options);
    
            this.Target1 = new TargetConverter<TInput1, (int, TInput1, TInput2)>(
                _actionBlock, x => (1, x, default), () => Complete(1));
    
            this.Target2 = new TargetConverter<TInput2, (int, TInput1, TInput2)>(
                _actionBlock, x => (2, default, x), () => Complete(2));
        }
    
        // Constructor with synchronous lambdas
        public ActionBlock(Action<TInput1> action1, Action<TInput2> action2,
            ExecutionDataflowBlockOptions options = null) : this(
                item1 => { action1(item1); return Task.CompletedTask; },
                item2 => { action2(item2); return Task.CompletedTask; }, options) { }
    
        // Complete when both targets complete
        private readonly bool[] _completeState = new bool[2];
        private void Complete(int index)
        {
            bool completed;
            lock (_completeState.SyncRoot)
            {
                _completeState[index - 1] = true;
                completed = _completeState.All(v => v);
            }
            if (completed) _actionBlock.Complete();
        }
    }
    
    // Generic class for converting from one type of ITargetBlock to another
    public class TargetConverter<TFrom, TTo> : ITargetBlock<TFrom>
    {
        private readonly ITargetBlock<TTo> _parent;
        public readonly Func<TFrom, TTo> _convert;
        public readonly Action _completeAction;
    
        public TargetConverter(ITargetBlock<TTo> parent, Func<TFrom, TTo> convert,
            Action completeAction = null)
        {
            if (parent == null) throw new ArgumentNullException(nameof(parent));
            if (convert == null) throw new ArgumentNullException(nameof(convert));
            _parent = parent;
            _convert = convert;
            _completeAction = completeAction;
        }
    
        Task IDataflowBlock.Completion => _parent.Completion;
        void IDataflowBlock.Complete()
        {
            if (_completeAction != null) _completeAction(); else _parent.Complete();
        }
        void IDataflowBlock.Fault(Exception ex) => _parent.Fault(ex);
    
        DataflowMessageStatus ITargetBlock<TFrom>.OfferMessage(
            DataflowMessageHeader messageHeader, TFrom messageValue,
            ISourceBlock<TFrom> source, bool consumeToAccept)
        {
            return _parent.OfferMessage(messageHeader,
                _convert(messageValue),
                source != null ? new SourceProxy(source, this) : null,
                consumeToAccept);
        }
    
        // An internal ISourceBlock facade is also needed
        private class SourceProxy : ISourceBlock<TTo>
        {
            private readonly ISourceBlock<TFrom> _source;
            private readonly TargetConverter<TFrom, TTo> _target;
    
            public SourceProxy(ISourceBlock<TFrom> source,
                TargetConverter<TFrom, TTo> target)
            {
                _source = source;
                _target = target;
            }
    
            TTo ISourceBlock<TTo>.ConsumeMessage(
                DataflowMessageHeader messageHeader,
                ITargetBlock<TTo> target,
                out bool messageConsumed)
            {
                return _target._convert(_source.ConsumeMessage(messageHeader,
                    _target, out messageConsumed));
            }
    
            bool ISourceBlock<TTo>.ReserveMessage(
                DataflowMessageHeader messageHeader,
                ITargetBlock<TTo> target)
            {
                return _source.ReserveMessage(messageHeader, _target);
            }
    
            void ISourceBlock<TTo>.ReleaseReservation(
                DataflowMessageHeader messageHeader,
                ITargetBlock<TTo> target)
            {
                _source.ReleaseReservation(messageHeader, _target);
            }
    
            Task IDataflowBlock.Completion => throw new NotSupportedException();
            void IDataflowBlock.Complete() => throw new NotSupportedException();
            void IDataflowBlock.Fault(Exception exception)
                => throw new NotSupportedException();
    
            IDisposable ISourceBlock<TTo>.LinkTo(
                ITargetBlock<TTo> target,
                DataflowLinkOptions linkOptions) => throw new NotSupportedException();
        }
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-10-21
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多