这是一种略有不同的方法。这个问题的棘手部分是如何知道BatchBlock<T> 何时发出批处理,以便停用内部计时器。我选择的解决方案是在每次ITargetBlock<T[]> 链接到BatchBlock<T> 时拦截TargetWrapper,并将TargetWrapper 接收到的批次传播到真正的目标。
下面的TimeoutBatchBlock<T> 类提供了BatchBlock<T> 的全部功能。它具有所有 API 并支持所有选项。它是BatchBlock<T> 实例的薄包装器(每个链接目标加上一个TargetWrapper 实例)。
/// <summary>
/// Provides a dataflow block that batches inputs into arrays.
/// A batch is produced when the number of currently queued items becomes equal
/// to batchSize, or when a timeout period has elapsed after receiving the first
/// item in the batch.
/// </summary>
public class TimeoutBatchBlock<T> : IPropagatorBlock<T, T[]>,
IReceivableSourceBlock<T[]>
{
private readonly BatchBlock<T> _source;
private readonly TimeSpan _timeout;
private readonly Timer _timer;
private bool _timerEnabled;
public TimeoutBatchBlock(int batchSize, TimeSpan timeout,
GroupingDataflowBlockOptions dataflowBlockOptions)
{
// Arguments validation omitted
_source = new BatchBlock<T>(batchSize, dataflowBlockOptions);
_timeout = timeout;
_timer = new Timer(_ => _source.TriggerBatch());
_timerEnabled = false;
}
public TimeoutBatchBlock(int batchSize, TimeSpan timeout) : this(batchSize,
timeout, new GroupingDataflowBlockOptions())
{ }
public int BatchSize => _source.BatchSize;
public TimeSpan Timeout => _timeout;
public Task Completion => _source.Completion;
public int OutputCount => _source.OutputCount;
public void Complete() => _source.Complete();
void IDataflowBlock.Fault(Exception exception)
=> ((IDataflowBlock)_source).Fault(exception);
public IDisposable LinkTo(ITargetBlock<T[]> target,
DataflowLinkOptions linkOptions)
{
return _source.LinkTo(new TargetWrapper(target, this), linkOptions);
}
private class TargetWrapper : ITargetBlock<T[]>
{
private readonly ITargetBlock<T[]> _realTarget;
private readonly TimeoutBatchBlock<T> _parent;
public TargetWrapper(ITargetBlock<T[]> realTarget, TimeoutBatchBlock<T> parent)
{
_realTarget = realTarget;
_parent = parent;
}
public Task Completion => _realTarget.Completion;
public void Complete() => _realTarget.Complete();
public void Fault(Exception exception) => _realTarget.Fault(exception);
public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader,
T[] messageValue, ISourceBlock<T[]> source, bool consumeToAccept)
{
var offerResult = _realTarget.OfferMessage(messageHeader,
messageValue, source, consumeToAccept);
if (offerResult == DataflowMessageStatus.Accepted)
_parent.DeactivateTimerIfActive(); // The block emitted a new batch
return offerResult;
}
}
public void TriggerBatch() => _source.TriggerBatch();
public bool TryReceive(Predicate<T[]> filter, out T[] item)
=> _source.TryReceive(filter, out item);
public bool TryReceiveAll(out IList<T[]> items)
=> _source.TryReceiveAll(out items);
private void SetTimerState(bool enabled)
{
lock (_timer)
{
if (enabled == _timerEnabled) return;
_timer.Change(
enabled ? _timeout : System.Threading.Timeout.InfiniteTimeSpan,
System.Threading.Timeout.InfiniteTimeSpan);
_timerEnabled = enabled;
}
}
private void ActivateTimerIfInactive() => SetTimerState(true);
private void DeactivateTimerIfActive() => SetTimerState(false);
DataflowMessageStatus ITargetBlock<T>.OfferMessage(
DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source,
bool consumeToAccept)
{
var offerResult = ((ITargetBlock<T>)_source).OfferMessage(messageHeader,
messageValue, source, consumeToAccept);
if (offerResult == DataflowMessageStatus.Accepted)
ActivateTimerIfInactive(); // The block received a new message
return offerResult;
}
T[] ISourceBlock<T[]>.ConsumeMessage(DataflowMessageHeader messageHeader,
ITargetBlock<T[]> target, out bool messageConsumed)
=> ((ISourceBlock<T[]>)_source).ConsumeMessage(messageHeader,
target, out messageConsumed);
bool ISourceBlock<T[]>.ReserveMessage(DataflowMessageHeader messageHeader,
ITargetBlock<T[]> target)
=> ((ISourceBlock<T[]>)_source).ReserveMessage(messageHeader, target);
void ISourceBlock<T[]>.ReleaseReservation(DataflowMessageHeader messageHeader,
ITargetBlock<T[]> target)
=> ((ISourceBlock<T[]>)_source).ReleaseReservation(messageHeader, target);
}