【问题标题】:Batching on duration or threshold using TPL Dataflow使用 TPL 数据流对持续时间或阈值进行批处理
【发布时间】:2019-03-09 01:21:12
【问题描述】:

我已经使用 TPL 数据流实现了生产者..消费者模式。用例是代码从 Kafka 总线读取消息。为了效率,我们需要在去数据库的时候批量处理消息。

TPL 数据流中是否有办法在达到大小或持续时间阈值时保留消息并触发?

例如,当前实现将消息从队列中拉出后发布。

    postedSuccessfully = targetBuffer.Post(msg.Value);

【问题讨论】:

  • 使用BatchBlockBatchBlock 将收集消息,直到读取批量大小,然后发出一组消息以供下游处理。
  • 谢谢。 BatchBlock 将收集消息。如果达到某个时间阈值,我还需要发出消息。是否可以指定最大消息数或超时阈值?
  • 没有开箱即用的超时,但您可以使用计时器清空它。有最大组和容量选项,这可能有助于您的其他要求。
  • @AshishBhatia 为什么不使用响应式扩展呢? Buffer 允许按计数或时间跨度进行批处理,例如 mySequence.Buffer(TimeSpan.FromSeconds(1))

标签: c# task-parallel-library tpl-dataflow batching


【解决方案1】:

通过System.Reactive,特别是Buffer 运算符,已经可以按计数和持续时间进行缓冲。缓冲区收集传入事件,直到达到所需的计数或其时间跨度到期。

Dataflow 块旨在与 System.Reactive 一起使用。使用 DataflowBlock.AsObservable()AsObserver() 扩展方法阻止 can be converted 到 Observables 和 Observers。

这使得构建缓冲块变得非常容易:

public static IPropagatorBlock<TIn,IList<TIn>> CreateBuffer<TIn>(TimeSpan timeSpan,int count)
{
    var inBlock = new BufferBlock<TIn>();
    var outBlock = new BufferBlock<IList<TIn>>();

    var outObserver=outBlock.AsObserver();
    inBlock.AsObservable()
            .Buffer(timeSpan, count)
            .ObserveOn(TaskPoolScheduler.Default)
            .Subscribe(outObserver);

    return DataflowBlock.Encapsulate(inBlock, outBlock);

}

此方法使用两个缓冲区块来缓冲输入和输出。 Buffer() 在批处理已满或时间跨度到期时从输入块(可观察者)读取并写入输出块(观察者)。

默认情况下,Rx 在当前线程上工作。通过调用ObserveOn(TaskPoolScheduler.Default),我们告诉它在任务池线程上处理数据。

示例

此代码为 5 个项目或 1 秒创建一个缓冲块。它首先发布 7 个项目,等待 1.1 秒,然后再发布 7 个项目。每个批次都与线程 ID 一起写入控制台:

static async Task Main(string[] args)
{
    //Build the pipeline
    var bufferBlock = CreateBuffer<string>(TimeSpan.FromSeconds(1), 5);

    var options = new DataflowLinkOptions { PropagateCompletion = true };
    var printBlock = new ActionBlock<IList<string>>(items=>printOut(items));
    bufferBlock.LinkTo(printBlock, options);

    //Start the messages
    Console.WriteLine($"Starting on {Thread.CurrentThread.ManagedThreadId}");

    for (int i=0;i<7;i++)
    {
        bufferBlock.Post(i.ToString());
    }
    await Task.Delay(1100);
    for (int i=7; i < 14; i++)
    {
        bufferBlock.Post(i.ToString());
    }
    bufferBlock.Complete();
    Console.WriteLine($"Finishing");
    await bufferBlock.Completion;
    Console.WriteLine($"Finished on {Thread.CurrentThread.ManagedThreadId}");
    Console.ReadKey();
}

static void printOut(IEnumerable<string> items)
{
    var line = String.Join(",", items);
    Console.WriteLine($"{line} on {Thread.CurrentThread.ManagedThreadId}");
}

输出是:

Starting on 1
0,1,2,3,4 on 4
5,6 on 8
Finishing
7,8,9,10,11 on 8
12,13 on 6
Finished on 6

【讨论】:

  • 是否可以等待 outBlock 为空后再写入更多数据?
  • 它已经可用。您可以通过BoundedCapacity 选项在任何块上指定边界,并使用await block.SendAsync 发布到它。如果块已满,SendAsync 将异步等待。 Post 将在相同情况下返回 false。如果您设置BoundedCapacity=1,该方法将仅在处理前一个缓冲区时发布一个新缓冲区
【解决方案2】:

虽然没有开箱即用的超时,但只要下游管道等待批处理的时间足够长,您就可以将计时器连接到TriggerBatch。然后在有批次流过时重置计时器。 BatchBlock 将为您处理剩下的事情。

现在,例如,此示例已配置为每次都导致批处理大小为 1,即使批处理块通常会等待 10 个元素。超时强制清空BatchBlock中当前存储的所有内容

public class BatchBlockExample
{
    [Test]
    public async Task BatchBlockWithTimeOut()
    {
        var batchBlock = new BatchBlock<int>(10);

        var timeOut = TimeSpan.FromSeconds(1);
        var timeOutTimer = new System.Timers.Timer(timeOut.TotalMilliseconds);
        timeOutTimer.Elapsed += (s, e) => batchBlock.TriggerBatch();            

        var actionBlock = new ActionBlock<IEnumerable<int>>(x =>
        {
            //Reset the timeout since we got a batch
            timeOutTimer.Stop();
            timeOutTimer.Start();
            Console.WriteLine($"Batch Size: {x.Count()}");
        });

        batchBlock.LinkTo(actionBlock, new DataflowLinkOptions() { PropagateCompletion = true });
        timeOutTimer.Start();

        foreach(var item in Enumerable.Range(0, 5))
        {
            await Task.Delay(2000);
            await batchBlock.SendAsync(item);
        }

        batchBlock.Complete();
        await actionBlock.Completion;
    }
}

输出:

Batch Size: 1
Batch Size: 1
Batch Size: 1
Batch Size: 1
Batch Size: 1

【讨论】:

    【解决方案3】:

    我猜你可以使用这样的东西,基本上它只是 BatchBlock 和一个 Timeout 全部合并为一个

    BatchBlockEx

    public sealed class BatchBlockEx<T> : IDataflowBlock, IPropagatorBlock<T, T[]>, ISourceBlock<T[]>, ITargetBlock<T>, IReceivableSourceBlock<T[]>
    {
       private readonly AsyncAutoResetEvent _asyncAutoResetEvent = new AsyncAutoResetEvent();
    
       private readonly BatchBlock<T> _base;
    
       private readonly CancellationToken _cancellationToken;
    
       private readonly int _triggerTimeMs;
    
       public BatchBlockEx(int batchSize, int triggerTimeMs)
       {
          _triggerTimeMs = triggerTimeMs;
          _base = new BatchBlock<T>(batchSize);
          PollReTrigger();
       }
    
       public BatchBlockEx(int batchSize, int triggerTimeMs, GroupingDataflowBlockOptions dataflowBlockOptions)
       {
          _triggerTimeMs = triggerTimeMs;
          _cancellationToken = dataflowBlockOptions.CancellationToken;
          _base = new BatchBlock<T>(batchSize, dataflowBlockOptions);
          PollReTrigger();
       }
    
       public int BatchSize => _base.BatchSize;
    
       public int OutputCount => _base.OutputCount;
    
       public Task Completion => _base.Completion;
    
       public void Complete() => _base.Complete();
    
       void IDataflowBlock.Fault(Exception exception) => ((IDataflowBlock)_base).Fault(exception);
    
       public IDisposable LinkTo(ITargetBlock<T[]> target, DataflowLinkOptions linkOptions) => _base.LinkTo(target, linkOptions);
    
       T[] ISourceBlock<T[]>.ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<T[]> target, out bool messageConsumed) => ((ISourceBlock<T[]>)_base).ConsumeMessage(messageHeader, target, out messageConsumed);
    
       void ISourceBlock<T[]>.ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<T[]> target) => ((ISourceBlock<T[]>)_base).ReleaseReservation(messageHeader, target);
    
       bool ISourceBlock<T[]>.ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<T[]> target) => ((ISourceBlock<T[]>)_base).ReserveMessage(messageHeader, target);
    
       DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, bool consumeToAccept)
       {
          _asyncAutoResetEvent.Set();
          return ((ITargetBlock<T>)_base).OfferMessage(messageHeader, messageValue, source, consumeToAccept);
       }
    
       public bool TryReceive(Predicate<T[]> filter, out T[] item) => _base.TryReceive(filter, out item);
    
       public bool TryReceiveAll(out IList<T[]> items) => _base.TryReceiveAll(out items);
    
       public override string ToString() => _base.ToString();
    
       public void TriggerBatch() => _base.TriggerBatch();
    
       private void PollReTrigger()
       {
          async Task Poll()
          {
             try
             {
                while (!_cancellationToken.IsCancellationRequested)
                {
                   await _asyncAutoResetEvent.WaitAsync()
                                              .ConfigureAwait(false);
    
                   await Task.Delay(_triggerTimeMs, _cancellationToken)
                               .ConfigureAwait(false); 
                   TriggerBatch();
                }
             }
             catch (TaskCanceledException)
             {
                // nope
             }
          }
    
          Task.Run(Poll, _cancellationToken);
       }
    }
    

    AsyncAutoResetEvent

    public class AsyncAutoResetEvent
    {
       private static readonly Task _completed = Task.FromResult(true);
       private readonly Queue<TaskCompletionSource<bool>> _waits = new Queue<TaskCompletionSource<bool>>();
       private bool _signaled;
    
       public Task WaitAsync()
       {
          lock (_waits)
          {
             if (_signaled)
             {
                _signaled = false;
                return _completed;
             }
    
             var tcs = new TaskCompletionSource<bool>();
             _waits.Enqueue(tcs);
             return tcs.Task;
          }
       }
    
       public void Set()
       {
          TaskCompletionSource<bool> toRelease = null;
    
          lock (_waits)
             if (_waits.Count > 0)
                toRelease = _waits.Dequeue();
             else if (!_signaled)
                _signaled = true;
    
          toRelease?.SetResult(true);
       }
    }
    

    【讨论】:

      【解决方案4】:

      这是一种略有不同的方法。这个问题的棘手部分是如何知道BatchBlock&lt;T&gt; 何时发出批处理,以便停用内部计时器。我选择的解决方案是在每次ITargetBlock&lt;T[]&gt; 链接到BatchBlock&lt;T&gt; 时拦截TargetWrapper,并将TargetWrapper 接收到的批次传播到真正的目标。

      下面的TimeoutBatchBlock&lt;T&gt; 类提供了BatchBlock&lt;T&gt; 的全部功能。它具有所有 API 并支持所有选项。它是BatchBlock&lt;T&gt; 实例的薄包装器(每个链接目标加上一个TargetWrapper 实例)。

      /// <summary>
      /// Provides a dataflow block that batches inputs into arrays.
      /// A batch is produced when the number of currently queued items becomes equal
      /// to batchSize, or when a timeout period has elapsed after receiving the first
      /// item in the batch.
      /// </summary>
      public class TimeoutBatchBlock<T> : IPropagatorBlock<T, T[]>,
          IReceivableSourceBlock<T[]>
      {
          private readonly BatchBlock<T> _source;
          private readonly TimeSpan _timeout;
          private readonly Timer _timer;
          private bool _timerEnabled;
      
          public TimeoutBatchBlock(int batchSize, TimeSpan timeout,
              GroupingDataflowBlockOptions dataflowBlockOptions)
          {
              // Arguments validation omitted
              _source = new BatchBlock<T>(batchSize, dataflowBlockOptions);
              _timeout = timeout;
              _timer = new Timer(_ => _source.TriggerBatch());
              _timerEnabled = false;
          }
      
          public TimeoutBatchBlock(int batchSize, TimeSpan timeout) : this(batchSize,
              timeout, new GroupingDataflowBlockOptions())
          { }
      
          public int BatchSize => _source.BatchSize;
          public TimeSpan Timeout => _timeout;
          public Task Completion => _source.Completion;
          public int OutputCount => _source.OutputCount;
      
          public void Complete() => _source.Complete();
      
          void IDataflowBlock.Fault(Exception exception)
              => ((IDataflowBlock)_source).Fault(exception);
      
          public IDisposable LinkTo(ITargetBlock<T[]> target,
              DataflowLinkOptions linkOptions)
          {
              return _source.LinkTo(new TargetWrapper(target, this), linkOptions);
          }
      
          private class TargetWrapper : ITargetBlock<T[]>
          {
              private readonly ITargetBlock<T[]> _realTarget;
              private readonly TimeoutBatchBlock<T> _parent;
      
              public TargetWrapper(ITargetBlock<T[]> realTarget, TimeoutBatchBlock<T> parent)
              {
                  _realTarget = realTarget;
                  _parent = parent;
              }
      
              public Task Completion => _realTarget.Completion;
              public void Complete() => _realTarget.Complete();
              public void Fault(Exception exception) => _realTarget.Fault(exception);
      
              public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader,
                  T[] messageValue, ISourceBlock<T[]> source, bool consumeToAccept)
              {
                  var offerResult = _realTarget.OfferMessage(messageHeader,
                      messageValue, source, consumeToAccept);
                  if (offerResult == DataflowMessageStatus.Accepted)
                      _parent.DeactivateTimerIfActive(); // The block emitted a new batch
                  return offerResult;
              }
          }
      
          public void TriggerBatch() => _source.TriggerBatch();
      
          public bool TryReceive(Predicate<T[]> filter, out T[] item)
              => _source.TryReceive(filter, out item);
      
          public bool TryReceiveAll(out IList<T[]> items)
              => _source.TryReceiveAll(out items);
      
          private void SetTimerState(bool enabled)
          {
              lock (_timer)
              {
                  if (enabled == _timerEnabled) return;
                  _timer.Change(
                      enabled ? _timeout : System.Threading.Timeout.InfiniteTimeSpan,
                      System.Threading.Timeout.InfiniteTimeSpan);
                  _timerEnabled = enabled;
              }
          }
          private void ActivateTimerIfInactive() => SetTimerState(true);
          private void DeactivateTimerIfActive() => SetTimerState(false);
      
          DataflowMessageStatus ITargetBlock<T>.OfferMessage(
              DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source,
              bool consumeToAccept)
          {
              var offerResult = ((ITargetBlock<T>)_source).OfferMessage(messageHeader,
                  messageValue, source, consumeToAccept);
              if (offerResult == DataflowMessageStatus.Accepted)
                  ActivateTimerIfInactive(); // The block received a new message
              return offerResult;
          }
      
          T[] ISourceBlock<T[]>.ConsumeMessage(DataflowMessageHeader messageHeader,
              ITargetBlock<T[]> target, out bool messageConsumed)
                  => ((ISourceBlock<T[]>)_source).ConsumeMessage(messageHeader,
                      target, out messageConsumed);
      
          bool ISourceBlock<T[]>.ReserveMessage(DataflowMessageHeader messageHeader,
              ITargetBlock<T[]> target)
                  => ((ISourceBlock<T[]>)_source).ReserveMessage(messageHeader, target);
      
          void ISourceBlock<T[]>.ReleaseReservation(DataflowMessageHeader messageHeader,
              ITargetBlock<T[]> target)
                  => ((ISourceBlock<T[]>)_source).ReleaseReservation(messageHeader, target);
      }
      

      【讨论】:

      • 带有计时器的自定义 BatchBlock&lt;T&gt;,其行为略有不同 (BatchUntilInactiveBlock&lt;T&gt;),可以在 here 找到。
      猜你喜欢
      • 2015-12-22
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-11-10
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多