【问题标题】:How to make Dispose await for all async methods?如何让 Dispose 等待所有异步方法?
【发布时间】:2020-11-30 02:34:31
【问题描述】:

我有带有异步方法的一次性类。

class Gateway : IDisposable {
  public Gateway() {}
  public void Dispose() {}

  public async Task<Data> Request1 () {...}
  public async Task<Data> Request2 () {...}
  public async Task<Data> Request3 () {...}
}

我需要 Dispose 等待,直到所有正在运行的请求都完成。

那么,要么我需要跟踪所有正在运行的任务,要么使用来自 AsyncEx 的 AsyncLock,或者其他什么?

更新

据我所知,有人害怕阻止 Dispose。然后我们可以创建Task WaitForCompletionAsync()Task CancelAllAsync() 方法。

【问题讨论】:

  • 我不认为你可以。您必须依靠调用者在using 语句中正确地await 您的async 方法。详情见this blog post
  • 这是一个不应该出现的问题。但是这里没有足够的东西来设计更好的东西。
  • 确实,我认为你需要重新考虑你的设计,或者至少展示一下你是如何使用它的

标签: c# .net multithreading asynchronous async-await


【解决方案1】:

目前,您必须添加用户必须调用的 CloseAsync 方法。

C# 8.0 发布后,您可以依赖IAsyncDisposable Interface 及其语言支持:

await using (var asyncDisposable = GetAsyncDisposable())
{
    // ...
} // await asyncDisposable.DisposeAsync()

【讨论】:

  • 我不确定 ServiceProvider 是否支持 IAsyncDisposable。无论如何,这是另一个问题。
  • 那完全是另一个问题。它还需要支持异步创建依赖项。
  • 我现在正在一个新项目中采用这个。谢谢!
【解决方案2】:

这是可重用异步处理支持的解决方案。由于 .NET Core 3.0 尚未发布,我将提供当前 C# 版本 (7.3) 和 beta (8.0) 的代码。

一旦在对象上调用IDisposable.Dispose(),它就不会阻塞,并确保所有任务完成后立即处理。

源代码(当前 C# 版本,不含IAsyncDisposable

使用相关:

using System;
using System.Collections.Generic;
using System.Threading.Tasks;

所有跟踪任务完成后可以处理的界面:

public interface ITrackingDisposable : IDisposable
{
    //The implementation of the actual disposings
    Task FinishDisposeAsync();
}

跟踪所有正在运行的任务并在适当的时间调用延迟处理的处理器:

public class TrackingDisposer : IDisposable
{
    private readonly LinkedList<Task> _tasks = new LinkedList<Task>();

    private readonly ITrackingDisposable _target;

    public bool IsDisposed { get; private set; } = false;

    //The supported class must implement ITrackingDisposable
    public TrackingDisposer(ITrackingDisposable target)
    => _target = target ?? throw new ArgumentNullException();

    //Add a task to the tracking list, returns false if disposed
    //Without return value
    public bool Track(Func<Task> func, out Task result)
    {
        lock (_tasks)
        {
            if (IsDisposed)
            {
                result = null;
                return false;
            }

            var task = func();
            var node = _tasks.AddFirst(task);

            async Task ending()
            {
                await task;
                var dispose = false;
                lock (_tasks)
                {
                    _tasks.Remove(node);
                    dispose = IsDisposed && _tasks.Count == 0;
                }
                if (dispose)
                {
                    await _target.FinishDisposeAsync();
                }
            }

            result = ending();
        }
        return true;
    }

    //With return value
    public bool Track<TResult>(Func<Task<TResult>> func, out Task<TResult> result)
    {
        lock (_tasks)
        {
            if (IsDisposed)
            {
                result = null;
                return false;
            }

            var task = func();
            var node = _tasks.AddFirst(task);

            async Task<TResult> ending()
            {
                var result = await task;
                var dispose = false;
                lock (_tasks)
                {
                    _tasks.Remove(node);
                    dispose = IsDisposed && _tasks.Count == 0;
                }
                if (dispose)
                {
                    await _target.FinishDisposeAsync();
                }
                return result;
            }

            result = ending();
        }
        return true;
    }

    //The entry of applying for dispose
    public void Dispose()
    {
        var dispose = false;

        lock (_tasks)
        {
            if (IsDisposed)
            {
                return;
            }

            IsDisposed = true;
            dispose = _tasks.Count == 0;
        }

        if (dispose)
        {
            _target.FinishDisposeAsync();
        }
    }
}

一个简化实现的基类:

public abstract class TrackingDisposable : ITrackingDisposable
{
    private readonly TrackingDisposer _disposer;

    public TrackingDisposable()
    => _disposer = new TrackingDisposer(this);

    protected virtual void FinishDispose() { }

    protected virtual Task FinishDisposeAsync()
    => Task.CompletedTask;

    Task ITrackingDisposable.FinishDisposeAsync()
    {
        FinishDispose();
        return FinishDisposeAsync();
    }

    public void Dispose()
    => _disposer.Dispose();

    protected Task Track(Func<Task> func)
    => _disposer.Track(func, out var result)
        ? result
        : throw new ObjectDisposedException(nameof(TrackingDisposable));

    protected Task<TResult> Track<TResult>(Func<Task<TResult>> func)
    => _disposer.Track(func, out var result)
        ? result
        : throw new ObjectDisposedException(nameof(TrackingDisposable));
}

演示和测试输出

测试类:

internal sealed class TestDisposingObject : TrackingDisposable
{
    public Task Job0Async() => Track(async () =>
    {
        await Task.Delay(200);
        Console.WriteLine("Job0 done.");
    });

    public Task<string> Job1Async(int ms) => Track(async () =>
    {
        await Task.Delay(ms);
        return "Job1 done.";
    });

    protected override void FinishDispose()
    => Console.WriteLine("Disposed.");
}

主要:

internal static class Program
{
    private static async Task Main()
    {
        var result0 = default(Task);
        var result1 = default(Task);
        var obj = new TestDisposingObject();
        result0 = obj.Job0Async();
        result1 = obj.Job1Async(100).ContinueWith(r => Console.WriteLine(r.Result));
        obj.Dispose();
        Console.WriteLine("Waiting For jobs done...");
        await Task.WhenAll(result0, result1);
    }
}

输出:

Waiting For jobs done...
Job1 done.
Job0 done.
Disposed.

附加,C# 8.0(带有IAsyncDisposable

将类型定义替换为:

public interface ITrackingDisposable : IDisposable, IAsyncDisposable
{
    Task FinishDisposeAsync();
}

public class TrackingDisposer : IDisposable, IAsyncDisposable
{
    private readonly LinkedList<Task> _tasks = new LinkedList<Task>();

    private readonly ITrackingDisposable _target;

    private readonly TaskCompletionSource<object> _disposing = new TaskCompletionSource<object>();

    public bool IsDisposed { get; private set; } = false;

    public TrackingDisposer(ITrackingDisposable target)
    => _target = target ?? throw new ArgumentNullException();

    public bool Track(Func<Task> func, out Task result)
    {
        lock (_tasks)
        {
            if (IsDisposed)
            {
                result = null;
                return false;
            }

            var task = func();
            var node = _tasks.AddFirst(task);

            async Task ending()
            {
                await task;
                var dispose = false;
                lock (_tasks)
                {
                    _tasks.Remove(node);
                    dispose = IsDisposed && _tasks.Count == 0;
                }
                if (dispose)
                {
                    await _target.FinishDisposeAsync();
                    _disposing.SetResult(null);
                }
            }

            result = ending();
        }
        return true;
    }

    public bool Track<TResult>(Func<Task<TResult>> func, out Task<TResult> result)
    {
        lock (_tasks)
        {
            if (IsDisposed)
            {
                result = null;
                return false;
            }

            var task = func();
            var node = _tasks.AddFirst(task);

            async Task<TResult> ending()
            {
                var result = await task;
                var dispose = false;
                lock (_tasks)
                {
                    _tasks.Remove(node);
                    dispose = IsDisposed && _tasks.Count == 0;
                }
                if (dispose)
                {
                    await _target.FinishDisposeAsync();
                    _disposing.SetResult(null);
                }
                return result;
            }

            result = ending();
        }
        return true;
    }

    public void Dispose()
    {
        var dispose = false;

        lock (_tasks)
        {
            if (IsDisposed)
            {
                return;
            }

            IsDisposed = true;
            dispose = _tasks.Count == 0;
        }

        if (dispose)
        {
            _target.FinishDisposeAsync();
            _disposing.SetResult(null);
        }
    }

    public ValueTask DisposeAsync()
    {
        Dispose();
        return new ValueTask(_disposing.Task);
    }
}

public abstract class TrackingDisposable : ITrackingDisposable
{
    private readonly TrackingDisposer _disposer;

    public TrackingDisposable()
    => _disposer = new TrackingDisposer(this);

    protected virtual void FinishDispose() { }

    protected virtual Task FinishDisposeAsync()
    => Task.CompletedTask;

    Task ITrackingDisposable.FinishDisposeAsync()
    {
        FinishDispose();
        return FinishDisposeAsync();
    }

    public void Dispose()
    => _disposer.Dispose();

    public ValueTask DisposeAsync() => _disposer.DisposeAsync();

    protected Task Track(Func<Task> func)
    => _disposer.Track(func, out var result)
        ? result
        : throw new ObjectDisposedException(nameof(TrackingDisposable));

    protected Task<TResult> Track<TResult>(Func<Task<TResult>> func)
    => _disposer.Track(func, out var result)
        ? result
        : throw new ObjectDisposedException(nameof(TrackingDisposable));
}

测试主要:

internal static class Program
{
    private static async Task Main()
    {
        await using var obj = new TestDisposingObject();
        _ = obj.Job0Async();
        _ = obj.Job1Async(100).ContinueWith(r => Console.WriteLine(r.Result));
        Console.WriteLine("Waiting For jobs done...");
    }
}

【讨论】:

    【解决方案3】:

    这里的问题是Dispose()(还没有)的异步版本。所以你必须问自己——当你调用Dispose(),或者using 块结束时,你期望会发生什么......?换句话说,要求是什么?

    您可以要求Dispose 等待所有未完成的任务,然后执行其工作。但是 Dispose 不能使用await(它不是异步的)。它可以做的最好的事情是调用Result 来强制任务完成,但这将是一个阻塞调用,如果任何异步任务正在等待其他任何东西,它很容易死锁。

    相反,我建议以下要求:当调用者调用Dispose() 时,调用将标记网关被释放,然后立即返回,确保在最后一个任务完成后释放机制将自动激活.

    如果该要求足够,它可能的,但有点混乱。方法如下:

    1. 每次调用方法(例如Request)时,将返回的任务“包装”在另一个任务中,其中包括检查调用者是否已请求释放网关。

    2. 如果已请求处置,请立即处置,然后将任务标记为已完成。因此,当调用者等待任务时,它将强制处置。

    这是我的实现。我告诉过你这很丑。

    class Gateway : IDisposable 
    {
        protected readonly HttpClient _client = new HttpClient();  //an inner class that must be disposed when Gateway disposes
        protected bool _disposalRequested = false;
        protected bool _disposalCompleted = false;
        protected int _tasksRunning = 0;
    
    
        public void Dispose()
        {
            Console.WriteLine("Dispose() called.");
            _disposalRequested = true;  
            if (_tasksRunning == 0)
            {
                Console.WriteLine("No running tasks, so disposing immediately.");
                DisposeInternal();
            }
            else
            {
                Console.WriteLine("There are running tasks, so disposal shall be deferred.");
            }
        }
    
        protected void DisposeInternal()
        {
            if (!_disposalCompleted)
            {
                Console.WriteLine("Disposing");
                _client.Dispose();
                _disposalCompleted = true;
            }
        }
    
        protected async Task<T> AddDisposeWrapper<T>(Func<Task<T>> func)
        {
            if (_disposalRequested) throw new ObjectDisposedException("Disposal has already been requested. No new requests can be handled at this point.");
    
            _tasksRunning++;
            var result = await func();
            _tasksRunning--;
            await DisposalCheck();
            return result;
        }
    
        protected async Task DisposalCheck()
        {
            if (_disposalRequested) DisposeInternal();
        }
    
        public Task<Data> Request1()
        {
            return AddDisposeWrapper
            (
                Request1Internal
            );
        }
    
        public Task<Data> Request2()
        {
            return AddDisposeWrapper
            (
                Request2Internal
            );
        }
    
        protected async Task<Data> Request1Internal()
        {
            Console.WriteLine("Performing Request1 (slow)");
            await Task.Delay(3000);
            Console.WriteLine("Request1 has finished. Returning new Data.");
            return new Data();
        }
    
        protected async Task<Data> Request2Internal()
        {
            Console.WriteLine("Performing Request2 (fast)");
            await Task.Delay(1);
            Console.WriteLine("Request2 has finished. Returning new Data.");
            return new Data();
        }
    }
    

    这是一些测试代码:

    public class Program
    {
        public static async Task Test1()
        {
            Task<Data> task;
            using (var gateway = new Gateway())
            {
                task = gateway.Request1();
                await Task.Delay(1000);
            }
            var data = await task;
            Console.WriteLine("Test 1 is complete.");
        }
    
        public static async Task Test2()
        {
            Task<Data> task;
            using (var gateway = new Gateway())
            {
                task = gateway.Request2();
                await Task.Delay(1000);
            }
            var data = await task;
            Console.WriteLine("Test 2 is complete.");
        }
    
        public static async Task MainAsync()
        {
            await Test1();
            await Test2();
        }
    
        public static void Main()
        {
            MainAsync().GetAwaiter().GetResult();
            Console.WriteLine("Run completed at {0:yyyy-MM-dd HH:mm:ss}", DateTime.Now);
        }
    }
    

    这是输出:

    Performing Request1 (slow)
    Dispose() called.
    There are running tasks, so disposal shall be deferred.
    Request1 has finished. Returning new Data.
    Disposing
    Test 1 is complete.
    Performing Request2 (fast)
    Request2 has finished. Returning new Data.
    Dispose() called.
    No running tasks, so disposing immediately.
    Disposing
    Test 2 is complete.
    Run completed at 2019-05-15 00:34:46
    

    这是我的 Fiddle,如果您想尝试一下:Link

    我真的不建议这样做(如果要处理某些东西,您应该更好地控制它的生命周期),但是为您编写这段代码很有趣。

    注意:由于使用了引用计数,因此需要进行额外的工作才能使此解决方案成为线程安全的,或者使其能够适应 Gateway 的请求方法之一引发异常的情况。

    【讨论】:

      【解决方案4】:

      处理和等待完成是不同的事情。所以,我宁愿在任务仍在运行时抛出异常。

      我用Nito.AsyncEx.AsyncConditionVariable 编写了示例。我没有测试它,但我认为它应该可以工作。只需使用Completion.WaitAsync()

      另外我推荐这篇文章:https://blog.stephencleary.com/2013/03/async-oop-6-disposal.html

      class Gateway : IDisposable {
      
        private int runningTaskCount;
        public AsyncConditionVariable Completion { get; } = new AsyncConditionVariable( new AsyncLock() );
      
        public Gateway() {
        }
        public void Dispose() {
          if (runningTaskCount != 0) throw new InvalidOperationException( "You can not call this method when tasks are running" );
        }
      
        public async Task<Data> Request1 () {
          BeginTask();
          ...
          EndTask();
        }
      
        private void BeginTask() {
          Interlocked.Increment( ref runningTaskCount );
        }
        private void EndTask() {
          var result = Interlocked.Decrement( ref runningTaskCount );
          if (result == 0) Completion.NotifyAll();
        }
      
      }
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2020-02-22
        • 1970-01-01
        • 2013-02-15
        • 2013-07-24
        相关资源
        最近更新 更多