【问题标题】:How to restart a async method? Cancel previous run, await it and then start it如何重新启动异步方法?取消之前的运行,等待它然后启动它
【发布时间】:2019-06-03 09:44:39
【问题描述】:

我有一个方法RestartAsync,它启动了一个方法DoSomethingAsync。当再次调用RestartAsync 时,它应该取消DoSomethingAsync 并等待它完成(DoSomethingAsync 不能同步取消,并且不应在前一个任务仍在进行时调用它)。

我的第一个方法是这样的:

public async Task RestartTest()
{
    Task[] allTasks = { RestartAsync(), RestartAsync(), RestartAsync() } ;
    await Task.WhenAll(allTasks);
}

private async Task RestartAsync()
{
    _cts.Cancel();
    _cts = new CancellationTokenSource();
    await _somethingIsRunningTask;

    _somethingIsRunningTask = DoSomethingAsync(_cts.Token);

    await _somethingIsRunningTask;
}

private static int _numberOfStarts;

private async Task DoSomethingAsync(CancellationToken cancellationToken)
{
    _numberOfStarts++;
    int numberOfStarts = _numberOfStarts;

    try
    {
        Console.WriteLine(numberOfStarts + " Start to do something...");
        await Task.Delay(TimeSpan.FromSeconds(1)); // This operation can not be cancelled.
        await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
        Console.WriteLine(numberOfStarts + " Finished to do something...");
    }
    catch (OperationCanceledException)
    {
        Console.WriteLine(numberOfStarts + " Cancelled to do something...");
    }
}

调用 RestartAsync 三次时的实际输出如下所示(注意第二次运行取消并等待第一次,但同时第三次运行也在等待第一次,而不是取消并等待第二次):

1 Start to do something...
1 Cancelled to do something...
2 Start to do something...
3 Start to do something...
2 Finished to do something...
3 Finished to do something...

但我想要实现的是这个输出:

1 Start to do something...
1 Cancelled to do something...
2 Start to do something...
2 Cancelled to do something...
3 Start to do something...
3 Finished to do something...

我目前的解决方案如下:

private async Task RestartAsync()
{
    if (_isRestarting)
    {
        return;
    }

    _cts.Cancel();
    _cts = new CancellationTokenSource();

    _isRestarting = true;
    await _somethingIsRunningTask;
    _isRestarting = false;

    _somethingIsRunningTask = DoSomethingAsync(_cts.Token);

    await _somethingIsRunningTask;
}

然后我得到这个输出:

1 Start to do something...
1 Cancelled to do something...
2 Start to do something...
2 Finished to do something...

现在至少 DoSomethingAsync 仍在进行中时没有启动(请注意,忽略第三次运行,这并不重要,否则它应该取消第二次运行)。

但是这个解决方案感觉不好,我必须在任何我想要这种行为的地方重复这种丑陋的模式。 这种重启机制有什么好的模式或框架吗?

【问题讨论】:

    标签: c# .net concurrency async-await task-parallel-library


    【解决方案1】:

    我认为问题出在 RestartAsync 方法内部。请注意,如果异步方法要等待某些东西,它会立即返回一个任务,因此第二个 RestartAsync 实际上在它交换任务之前返回,然后第三个 RestartAsync 进来并首先等待任务 RestartAsync。

    此外,如果 RestartAsync 将由多个线程执行,您可能希望将 _cts 和 _somethingIsRunningTask 包装成一个并使用 Interlocked.Exchange 方法交换值以防止出现竞争条件。

    这是我的示例代码,未经过全面测试:

    public class Program
    {
        static async Task Main(string[] args)
        {
            RestartTaskDemo restartTaskDemo = new RestartTaskDemo();
    
            Task[] tasks = { restartTaskDemo.RestartAsync( 1000 ), restartTaskDemo.RestartAsync( 1000 ), restartTaskDemo.RestartAsync( 1000 ) };
            await Task.WhenAll( tasks );
    
            Console.ReadLine();
        }
    }
    
    public class RestartTaskDemo
    {
        private int Counter = 0;
    
        private TaskEntry PreviousTask = new TaskEntry( Task.CompletedTask, new CancellationTokenSource() );
    
        public async Task RestartAsync( int delay )
        {            
            TaskCompletionSource<bool> taskCompletionSource = new TaskCompletionSource<bool>();
            CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
    
            TaskEntry previousTaskEntry = Interlocked.Exchange( ref PreviousTask, new TaskEntry( taskCompletionSource.Task, cancellationTokenSource ) );
    
            previousTaskEntry.CancellationTokenSource.Cancel();
            await previousTaskEntry.Task.ContinueWith( Continue );
    
            async Task Continue( Task previousTask )
            {
                try
                {
                    await DoworkAsync( delay, cancellationTokenSource.Token );
                    taskCompletionSource.TrySetResult( true );
                }
                catch( TaskCanceledException )
                {
                    taskCompletionSource.TrySetCanceled();
                }
            }            
        }
    
        private async Task DoworkAsync( int delay, CancellationToken cancellationToken )
        {
            int count = Interlocked.Increment( ref Counter );
            Console.WriteLine( $"Task {count} started." );
    
            try
            {
                await Task.Delay( delay, cancellationToken );
                Console.WriteLine( $"Task {count} finished." );
            }
            catch( TaskCanceledException )
            {
                Console.WriteLine( $"Task {count} cancelled." );
                throw;
            }
        }
    
        private class TaskEntry
        {
            public Task Task { get; }
    
            public CancellationTokenSource CancellationTokenSource { get; }
    
            public TaskEntry( Task task, CancellationTokenSource cancellationTokenSource )
            {
                Task = task;
                CancellationTokenSource = cancellationTokenSource;
            }
        }
    }
    

    【讨论】:

    • 感谢这个解决方案和线程安全提示(尽管在我的例子中总是从 UI 线程调用重启)我不明白你为什么打电话给ContinueWith。为什么不等待previousTaskEntry.Task
    • 你可以用空的 try-catch 等待 previousTaskEntry.Task,它应该提供相同的结果。
    【解决方案2】:

    这是一个并发问题。因此,您需要一种解决并发问题的方法:信号量。

    在一般情况下,您还应该考虑正在运行的方法何时抛出OperationCanceledException

    private async Task DoSomethingAsync(CancellationToken cancellationToken)
    {
        _numberOfStarts++;
        int numberOfStarts = _numberOfStarts;
    
        try
        {
            Console.WriteLine(numberOfStarts + " Start to do something...");
            await Task.Delay(TimeSpan.FromSeconds(1)); // This operation can not be cancelled.
            await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
            Console.WriteLine(numberOfStarts + " Finished to do something...");
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine(numberOfStarts + " Cancelled to do something...");
            throw;
        }
    }
    

    试试这个:

    private SemaphoreSlim semaphore = new SemaphoreSlim(1);
    private (CancellationTokenSource cts, Task task)? state;
    
    private async Task RestartAsync()
    {
        Task task = null;
    
        await this.semaphore.WaitAsync();
    
        try
        {
            if (this.state.HasValue)
            {
                this.state.Value.cts.Cancel();
                this.state.Value.cts.Dispose();
    
                try
                {
                    await this.state.Value.task;
                }
                catch (OperationCanceledException)
                {
                }
    
                this.state = null;
            }
    
            var cts = new CancellationTokenSource();
            task = DoSomethingAsync(cts.Token);
    
            this.state = (cts, task);
        }
        finally
        {
            this.semaphore.Release();
        }
    
        try
        {
            await task;
        }
        catch (OperationCanceledException)
        {
        }
    }
    

    【讨论】:

    • 感谢您的解决方案!虽然我更喜欢等待上一个任务,然后是信号量,因为它更好地代表了我想要实现的目标,就像这个答案中建议的那样:stackoverflow.com/a/54102382/3446784 但这是一个品味问题。我认为这也不是真正的并发问题,因为该方法在现实中总是从UI线程调用,因此不涉及其他线程。
    • 它正在等待所有任务。如果没有并发,则不需要取消之前的执行。
    猜你喜欢
    • 1970-01-01
    • 2010-10-11
    • 1970-01-01
    • 2016-02-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-08-14
    相关资源
    最近更新 更多