【问题标题】:WaitUntil IF CondWaitUntil IF 条件
【发布时间】:2021-02-18 01:32:58
【问题描述】:

我正在使用并行 Foreach。我也在foreach中调用webapi。我想控制来自 api 的响应。如果我得到真值或其他值,我想完成所有任务和 foreach 迭代。

我在 stackoverflow 中搜索,但看不到任何响应。

Task<bool>[] tasks = new Task<bool>[customers.Count()];
CancellationTokenSource tokenSource = new CancellationTokenSource();
CancellationToken token = tokenSource.Token;
ParallelOptions po = new ParallelOptions();
po.MaxDegreeOfParallelism = System.Environment.ProcessorCount;

Parallel.ForEach(customers, po, async (customer, state, index) =>
{
    po.CancellationToken.ThrowIfCancellationRequested();
    var filePath = Path.Combine(_hostingEnvironment.WebRootPath, "Photos", $"{customer.CustomerNumber}.jpg");
    byte[] secondImageBytes = await System.IO.File.ReadAllBytesAsync(filePath).ConfigureAwait(false);
    ByteArrayContent secondImage = new ByteArrayContent(secondImageBytes);

    var test = await SomeFunction().ConfigureAwait(false);
    if (test)
    {
        //cancel all foreach task
    }
});

为了理解,测试代码:

  static async Task Main(string[] args)
    {
        Stopwatch watch = new Stopwatch();
        watch.Start();
        List<TaskItem> taskList = new();
        taskList.Add(new TaskItem { Id = 1, Delay = 100, isOkey = true, Name = "Try 1" });
        taskList.Add(new TaskItem { Id = 2, Delay = 2000, isOkey = false, Name = "Try 2" });
        taskList.Add(new TaskItem { Id = 3, Delay = 100, isOkey = false, Name = "Try 3" });
        taskList.Add(new TaskItem { Id = 4, Delay = 100, isOkey = false, Name = "Try 4" });
        taskList.Add(new TaskItem { Id = 5, Delay = 100, isOkey = false, Name = "Try 5" });
        taskList.Add(new TaskItem { Id = 6, Delay = 200, isOkey = false , Name = "Try 6" });
        taskList.Add(new TaskItem { Id = 7, Delay = 10000, isOkey = false, Name = "Try 7" });
        taskList.Add(new TaskItem { Id = 8, Delay = 10000, isOkey = false, Name = "Try 8" });
        taskList.Add(new TaskItem { Id = 9, Delay = 10000, isOkey = false, Name = "Try 9" });
        var control=await TestTask2Async(taskList);
        Console.WriteLine("Result ="+control);
        watch.Stop();
        Console.WriteLine(watch.Elapsed.TotalSeconds.ToString());
        Console.ReadKey();
    }
    public static async Task<int> TestTaskAsync(List<TaskItem> taskList)
    {       
        var matchedId = 0;       
        try
        {
            var tokenSource = new CancellationTokenSource();
            var token = tokenSource.Token;
            var SomeTask = Task.Factory.StartNew(async () =>
            {               
                await Task.WhenAll(taskList.AsEnumerable().Select(async item =>
                {
                    token.ThrowIfCancellationRequested();
                    await Task.Delay(100);
                    await Task.Delay(1000);
                    await Task.Delay(item.Delay);                        
                    Console.WriteLine("Process for " + item.Id);
                    if (item.isOkey)
                    {
                        Console.WriteLine("Founded " + item.Id);
                        matchedId = item.Id;                            
                        tokenSource.Cancel();                            
                    }
                }));
            },token);
            await SomeTask.Result;
        }
        catch (Exception e)
        {
            Console.WriteLine(e.Message);
        }
        return matchedId;
    }


    public static async Task<int> TestTask2Async(List<TaskItem> taskList)
    {
        var matchedId = 0;
        try
        {
            var tokenSource = new CancellationTokenSource();
            var token = tokenSource.Token;
            
            await Task.WhenAll(taskList.AsEnumerable().Select(async item =>
            {
                token.ThrowIfCancellationRequested();
                await Task.Delay(100).ConfigureAwait(false); 
                await Task.Delay(1000).ConfigureAwait(false); 
                await Task.Delay(item.Delay).ConfigureAwait(false); 
                Console.WriteLine("Process for " + item.Id);
                if (item.isOkey)
                {
                    Console.WriteLine("Founded " + item.Id);
                    matchedId = item.Id;
                    tokenSource.Cancel();
                }
            }));
        }
        catch (Exception)
        {
        }
        return matchedId;
    }
  public class TaskItem
{
    public int Id { get; set; }
    public int Delay { get; set; }
    public string Name { get; set; }
    public bool isOkey { get; set; }
}
  • 结果:

    4 的流程

    5 的过程

    1 的过程

    3 的过程

    创立 1

    6 的过程

    2 的过程

    7 的流程

    9 的流程

    8 进程

    结果 =1

    11.2402357

但我想如果成立(bla),所有任务都会死,我不想看到成立后(bla)的流程。

对于这个例子,我只想看看:

4 的流程

5 的过程

1 的过程

3 的过程

创立 1

结果 =1

1.2402357

【问题讨论】:

    标签: .net .net-core async-await parallel-processing parallel.foreach


    【解决方案1】:

    Parallel.ForEach 不是为与async-await 一起使用而设计的。

    一旦所有项目达到第一个未完成的awaitParallel.ForEach 就会终止。

    试试这个:

    var tokenSource = new CancellationTokenSource();
    var token = tokenSource.Token;
    
    await Task.WhenAll(customers.Select(async customer =>
    {
        token.ThrowIfCancellationRequested();
        var filePath = Path.Combine(_hostingEnvironment.WebRootPath, "Photos", $"{customer.CustomerNumber}.jpg");
        var secondImageBytes = await File.ReadAllBytesAsync(filePath).ConfigureAwait(false);
        var secondImage = new ByteArrayContent(secondImageBytes);
    
        var test = await SomeFunction().ConfigureAwait(false);
        if (test)
        {
            tokenSource.Cancel();
        }
    }));
    

    另外,不要将Task.Factory.StartNewasync-await 一起使用。请改用Task.Run

    关于取消令牌,您需要了解的一件事是它表示取消的意图,但它本身并不会取消任何内容。代码需要检查它并以任何需要的方式取消。

    试试这个:

    public static async Task<int> TestTask2Async(List<TaskItem> taskList)
    {
        var matchedId = 0;
        try
        {
            var tokenSource = new CancellationTokenSource();
            var token = tokenSource.Token;
    
            await Task.WhenAll(taskList.AsEnumerable().Select(async item =>
            {
                token.ThrowIfCancellationRequested();
                // add token to the call to Task.Delay
                await Task.Delay(100, token).ConfigureAwait(false);
                await Task.Delay(1000, token).ConfigureAwait(false);
                await Task.Delay(item.Delay, token).ConfigureAwait(false);
                // check it again
                token.ThrowIfCancellationRequested();
                Console.WriteLine("Process for " + item.Id);
                if (item.isOkey)
                {
                    // cancel as soon as the condition has been met
                    tokenSource.Cancel();
                    Console.WriteLine("Founded " + item.Id);
                    matchedId = item.Id;
                }
            }));
        }
        catch (Exception ex)
        {
        }
        return matchedId;
    }
    

    【讨论】:

    • 可以在WhenAll参数中使用List吗?现在我已经有一个错误。像这样:错误 CS1660 无法将 lambda 表达式转换为类型 'IEnumerable' 因为它不是委托类型
    • 感谢您的支持。但是在异步函数中我不能使用 File.ReadAllBytesAsync(filePath).ConfigureAwait(false);这种方法。 “ControllerBase.File(byte[], string)' 是一种方法,在给定的上下文中无效。”。另外,如果我写 "System.IO.File.ReadAllBytesAsync" ,我也会收到错误。
    • 我有这样的错误:“System.OperationCanceledException:操作被取消。 在 System.Threading.CancellationToken.ThrowOperationCanceledException() ; 在“
    • 那是因为取消令牌被取消了。这不是你想要的吗?
    • 我添加了示例代码并希望得到结果
    猜你喜欢
    • 1970-01-01
    • 2016-03-08
    • 2020-08-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多