【发布时间】:2019-10-10 10:21:18
【问题描述】:
使用线程,您可以创建持久的、可重用的局部变量,这些变量对于客户端连接等非常有用。但是,对于 System.Threading.Tasks.Dataflow 中的 ActionBlock 之类的任务,似乎没有任何类型的操作块的持久性或可重用性。因此,对于涉及与客户端交互的 ActionBlock,我的理解是,您要么需要从头开始初始化客户端连接,要么在更高范围内重用一个客户端连接(使用锁定?)。
用例:我正在使用一个反转控制的 .NET 库。大部分逻辑(除了启动和关闭)必须位于名为 ProcessEventsAsync 的单个 Task 方法中,由库调用,该方法接收 IEnumerable 数据。 ProcessEventsAsync 必须对所有数据进行一些处理,然后将其发送给一些下游消费者。为了提高性能,我尝试使用 Tasks 并行化 ProcessEventsAsync 中的逻辑。我还想从这个任务中收集一些性能指标。
让我举一个详细的例子来说明我在做什么:
internal class MyClass
{
private String firstDownStreamConnectionString;
private String secondDownStreamConnectionString;
private SomeClient firstClient;
private SomeClient secondClient;
private ReportingClient reportingClient;
private int totalUnhandledDataCount;
public MyClass(String firstDownStreamConnectionString, String secondDownStreamConnectionString, String reportingClientKey)
{
this.firstDownStreamConnectionString = firstDownStreamConnectionString;
this.secondDownStreamConnectionString = secondDownStreamConnectionString;
this.DegreeOfParallelism = Math.Max(Environment.ProcessorCount - 1, 1);
this.reportingClient = new ReportingClient (reportingClientKey, DegreeOfParallelism);
this.totalUnhandledDataCount = 0;
}
// called once when the framework signals that processing is about to be ready
public override async Task OpenAsync(CancellationToken cancellationToken, PartitionContext context)
{
this.firstClient = SomeClient.CreateFromConnectionString(this.firstDownStreamConnectionString);
this.secondClient = SomeClient.CreateFromConnectionString(this.secondDownStreamConnectionString );
await Task.Yield();
}
// this is called repeatedly by the framework
// outside of startup and shutdown, it is the only entrypoint to my logic
public override async Task ProcessEventsAsync(CancellationToken cancellationToken, PartitionContext context, IEnumerable<Data> inputData)
{
ActionBlock<List<Data>> processorActionBlock = new ActionBlock<List<Data>>(
inputData =>
{
SomeData firstDataset = new SomeData();
SomeData secondDataset = new SomeData();
int unhandledDataCount = 0;
foreach (Data data in inputData)
{
// if data fits one set of criteria, put it in firstDataSet
// if data fits other set of criteria, put it in secondDataSet
// otherwise increment unhandledDataCount
}
Interlocked.Add(ref this.totalUnhandledDataCount, unhandledDataCount);
lock (this.firstClient)
{
try
{
firstDataset.SendData(this.firstClient);
} catch (Exception e)
{
lock(this.reportingClient)
{
this.reportingClient.LogTrace(e);
}
}
}
lock (this.secondClient)
{
try
{
secondDataset.SendData(this.secondClient);
} catch (Exception e)
{
lock(this.reportingClient)
{
this.reportingClient.LogTrace(e);
}
}
}
},
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = this.DegreeOfParallelism
});
// construct as many List<Data> from inputData as there is DegreeOfParallelism
// put that in a variable called batches
for(int i = 0; i < DegreeOfParallelism; i++)
{
processorActionBlock.Post(batches[i]);
}
processorActionBlock.Complete();
processorActionBlock.Completion.Wait();
await context.CheckpointAsync();
}
}
我试图只保留相关代码,我省略了处理逻辑、大多数指标收集、数据如何发送、关闭逻辑等。
我想利用一些允许重用性的Task。我不想为这种类型的所有正在运行的任务重用单个客户端连接,我也不希望每个任务在每次调用时都创建一个新的客户端连接。我确实希望每个类似线程的任务都有一组持久的客户端连接。理想情况下,我也不想在 System.Threading.Tasks.Dataflow 中创建一个包装 Task 或扩展抽象类/接口的新类。
【问题讨论】:
-
您是否希望将操作放入队列中?客户提出一个事件并继续前进。事件进入队列或导致一些其他操作被放置在队列中。现在您已经与客户端断开连接,可以以最有效的方式处理该队列,可能包括并行处理。
-
只需将
ActionBlock和ConcurrentDictionary用于客户端,是的,任何并行都有线程安全开销 /i> 方法,它只是野兽的本性 此外,Dataflow 非常棒,并且专为处理数据管道时的这种情况而构建 -
@ScottHannen 我刚刚更新了我的代码,所以也许它会更清楚我想要做什么。这种方法的主要问题是 MyClass 的每个实例都需要对其接收的数据按顺序调用
context.CheckpointAsync();。所以对 ProcessEventsAsync 的调用需要按顺序完成,并且要完成一个调用,我必须必须能够在上下文中调用 checkPoint -
@TheGeneral 我目前实际上正在使用 ActionBlocks。使用 ConcurrentDictionary 实际上并不是一个坏主意。有没有办法让 ActionBlock 实例知道它的 instanceid,或者我需要为处理结帐的客户端的 ConcurrentDictionary 实现一个包装器?
-
"有没有办法让 ActionBlock 实例知道它的 instanceid" 在这些情况下,我要么为块创建一个元组或一个结构,即
ActionBlock<(int id, Payload data)>或 @987654326 @ 或类似的,然后在处理时您天真地拥有有关该对象的信息,就此而言,您无论如何都可以将您的客户端传递给操作块
标签: c# multithreading asynchronous task tpl-dataflow