【问题标题】:Immediately cancelling blocking operation with timeout立即取消超时阻塞操作
【发布时间】:2014-09-14 20:57:29
【问题描述】:

我有一个从队列中读取的阻塞操作,但它可能需要超时。我可以轻松地将其转换为“异步”操作:

    public async Task<IMessage> ReceiveAsync(CancellationToken cancellationToken)
    {
        return await Task.Run(() =>
        {
            while (true)
            {
                cancellationToken.ThrowIfCancellationRequested();

                // Try receiving for one second
                IMessage message = consumer.Receive(TimeSpan.FromSeconds(1));

                if (message != null)
                {
                    return message;
                }
            }
        }, cancellationToken).ConfigureAwait(false);
    }

中止线程通常被认为是不好的做法,因为您可能会泄漏资源,因此超时似乎是彻底停止线程的唯一方法。所以我有三个问题:

  1. “立即”取消的普遍接受的超时值是多少?
  2. 对于提供内置异步方法的库,是否确实存在立即取消,或者它们是否也使用超时和循环来模拟它?也许这里的问题是您将如何利用软件中断,以及这些是否还必须进行某种轮询以检查是否存在中断,即使它是在内核/CPU 级别。
  3. 是否有其他方法可以解决这个问题?

编辑:所以我可能已经用Thread.Interrupt() 找到了部分答案,然后处理ThreadInterruptedException。这基本上是内核级的软件中断,并且尽可能接近“立即”吗?以下是处理此问题的更好方法吗?

    public async Task<IMessage> ReceiveAsync(CancellationToken cancellationToken)
    {
        cancellationToken.ThrowIfCancellationRequested();

        var completionSource = new TaskCompletionSource<IMessage>();

        var receiverThread = new Thread(() =>
        {
            try
            {
                completionSource.SetResult(consumer.Receive());
            }
            catch (ThreadInterruptedException)
            {
                completionSource.SetCanceled();
            }
            catch (Exception ex)
            {
                completionSource.SetException(ex);
            }
        });

        cancellationToken.Register(receiverThread.Interrupt);

        receiverThread.Name = "Queue Receive";
        receiverThread.Start();

        return await completionSource.Task.ConfigureAwait(false);
    }

【问题讨论】:

  • 某些 API确实 支持立即取消,因为它们可以将线程置于等待状态而不是轮询。但是,如果您使用的 API 不支持 CancellationToken,那么您就会陷入轮询。在这种情况下,超时没有“普遍接受的”值,因为这是特定于应用程序的......中断不是内核级别的,本机端没有这样的事情。抛出异常的原因是为了通知调用者取消,如果一个空消息足够好,你可能不需要抛出。
  • @PeterRitchie:通过进一步的测试,在我的情况下,API 确实将线程放入 WaitSleepJoin 并且Thread.Interrupt 只会中断处于该状态的线程。事实上,如果我在启动线程之前中断,线程会一直运行到Receive(),然后抛出ThreadInterruptedException。在我看来,我的场景中的优先顺序是CancellationTokenThread.Interrupt,超时阻塞(大部分时间你在监听接收),轮询(不阻塞接收,大部分时间你在睡觉) )。 Thread.Interrupt 对我来说似乎是第一个可行的案例。
  • 可能,但很难判断调用中断对消费者实例造成了什么损害。
  • @PeterRitchie:好点...Thread.Interrupt 似乎只有在我所有的工作线程都处于 WaitSleepJoin 时才会干净地关闭。如果工作实际上正在完成,我的连接对象将在所有线程中失效,所以我在其他意外区域得到ThreadInterruptedException。 :( 看起来我回到了超时的阻塞接收状态。我想我只能在控制所有代码时可靠地使用Thread.Interrupt
  • 是的,我认为超时是最安全的选择

标签: c# .net multithreading async-await blocking


【解决方案1】:
  1. 这取决于您的具体需求。一秒钟对某些人来说可能是立即的,而对另一些人来说可能很慢。
  2. 提供async API 的库(好的库)自下而上执行此操作。它们通常不会用线程包装阻塞(同步)操作以使它们看起来是异步的。他们使用TaskCompletionSource 来创建真正的async 方法。
  3. 我不确定您所说的队列是什么意思(.Net 中的内置 Queue 没有 Receive 方法)但您可能应该使用真正的 async 数据结构,例如 @987654329 @的BufferBlock

关于您的特定代码示例。 您在整个操作过程中都在挂起一个线程(即async over sync),这很昂贵。相反,您可以尝试快速消耗,然后异步等待超时结束,或者 CancellationToken 被取消。 在Task.Run 中使用另一个线程也没有任何意义。你可以简单地让async lambda 成为ReceiveAsync 的内容:

public async Task<IMessage> ReceiveAsync(CancellationToken cancellationToken)
{
    while (true)
    {
        cancellationToken.ThrowIfCancellationRequested();
        // Try receiving for one second
        IMessage message;
        if (!consumer.TryReceive(out message))
        {
             await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
        }
        if (message != null)
        {
            return message;
        }
    }
}

如果您的队列实现IDisposable,则在取消CancellationToken 时,另一个(更苛刻)选项将调用DisposeHere's how.

【讨论】:

    猜你喜欢
    • 2015-03-30
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-05-26
    • 2011-02-21
    • 1970-01-01
    相关资源
    最近更新 更多