【问题标题】:How can I serialize thread execution across a task continuation block?如何跨任务继续块序列化线程执行?
【发布时间】:2018-10-02 07:15:44
【问题描述】:

在下面的示例中,对 HandleChangesAsync 的调用是从异步任务中并通过事件处理程序进行的。

问题 - 有没有办法确保一次只有一个线程可以执行 HandleChangesAsync create_task + 任务继续块(即使任务继续块调用其他异步函数)?

请注意,我不能只使用同步原语,因为 HandleChangesAsync 可以在异步操作完成之前返回。

void MyClass::DoStuffAsync()
{    
    WeakReference weakThis(this);
    create_task(DoMoreStuffAsync())
        .then([weakThis]())
    {
        auto strongThis = weakThis.Resolve<HomePromotionTemplateViewModel>();
        if (strongThis)
        {
            strongThis->RegisterForChanges();
            strongThis->HandleChangesAsync();
        }
    });
}

void MyClass::RegisterForChanges()
{    
    // Attach event handler to &MyClass::OnSomethingChanged
}

void MyClass::OnSomethingChanged()
{
    HandleChangesAsync();
}

void MyClass::HandleChangesAsync()
{    
    WeakReference weakThis(this);
    create_task(DoMoreCoolStuffAsync(m_needsProtection))
        .then([weakThis]()
    {
        // do async stuff 
        // update m_needsProtection
    })
        .then([weakThis]()
    {
        // do more async stuff 
        // update m_needsProtection
    });
}

【问题讨论】:

  • 与其尝试在许多异步操作中兼顾线程保护,不如在需要同步的数据周围放置一个互斥锁?
  • @vu1p3n0x - 我已经调整了代码示例。 1)我真的想知道在这种情况下是否可以进行线程同步。 2)如果我确实使用了互斥锁,我该如何在上面的代码示例中实际调用锁呢?
  • 一目了然,代码要达到什么目的并不清楚。 X是什么? en.wikipedia.org/wiki/XY_problem 不管是什么,我怀疑有一个更简单的解决方案。
  • @JiveDadson - 我只是想了解是否可以通过 create_task 和延续来同步执行异步操作的代码块(在本例中为 HandleChangesAsync() 的主体)。如果所有操作都是同步的,我会用锁包围它们并完成。但这不是异步的选项,所以我想知道是否有替代方案。
  • 我的问题是你为什么要这样做?您要解决的根本问题是什么?

标签: uwp task c++-cx ppl


【解决方案1】:

假设您只想忽略重叠请求(而不是将它们排队),这很容易使用原子布尔值完成。可以将以下内容粘贴到新的 XAML 页面中,然后检查输出窗口。有两个线程竞相调用DoStuff(),但每次只有一个线程会执行——在工作完成时value_ 的值将始终是016;从来没有别的。如果多个线程同时执行该工作,您可能会得到其他数字。

有一堆愚蠢的代码可以“工作”,但基本在 compare_exchange_strong() call 中。它基本上说“我希望busy_ 的值是false,在这种情况下将busy_ 更新为true 并返回true(在这种情况下,我将开始工作)。但如果busy_ 的值 不是 已经 false 然后返回 false (我不会做任何工作)”。 注意if 中的逻辑否定! :-)

我不是内存排序方面的专家,因此如果您在紧密循环中运行,可能有一种更有效的方法(即传递明确的memory_order 值),但它应该是正确的并且应该正常的用户体验工作就足够了:

#include <atomic>
#include <ppltasks.h>
#include <string>

using namespace concurrency;

class Test
{
  std::atomic<bool> busy_;
  int value_;

  task<int> AddNumbersAsync(int x, int y)
  {
    return task<int>{[x, y]
    {
      Sleep(20);
      return x + y;
    }};
  }

  void CompleteStuff()
  {
    OutputDebugStringA("** Done with work; value is ");
    OutputDebugStringA(std::to_string(value_).c_str());
    OutputDebugStringA("\r\n");
    busy_ = false;
  }

public:
  Test()
    : busy_{ false }, value_{ 0 }
  {}

  void DoStuff()
  {
    // ---
    // This is where the magic happens...
    // ---
    bool expected{ false };
    if (!busy_.compare_exchange_strong(expected, true))
    {
      OutputDebugStringA("Work already in progress; bailing.\r\n");
      return;
    }

    OutputDebugStringA("Doing work...\r\n");
    value_ = 2;

    try
    {
      AddNumbersAsync(value_, value_).then([this](int i)
      {
        value_ = i;
        return AddNumbersAsync(value_, value_);
      }).then([this](int i)
      {
        value_ = i;
        return AddNumbersAsync(value_, value_);
      }).then([this](task<int> i)
      {
        // ---
        // Handle any async exceptions
        // ---
        try
        {
          value_ = i.get();
        }
        catch (...)
        {
          OutputDebugStringA("Oops, an async exception! Resetting value_\r\n");
          value_ = 0;
        }
        CompleteStuff();
      });
    }
    // ---
    // Handle any sync exceptions
    // ---
    catch (...)
    {
      OutputDebugStringA("Oops, an exception! Resetting value_\r\n");
      value_ = 0;
      CompleteStuff();
    }
  }
};

Test t;

task<void> TestTask1;
task<void> TestTask2;

MainPage::MainPage()
{
  InitializeComponent();
  TestTask1 = task<void>([]
  {
    for (int i = 0; i < 100; i++)
    {
      t.DoStuff();
      Sleep(20);
    }
  });

  TestTask1 = task<void>([]
  {
    for (int i = 0; i < 100; i++)
    {
      t.DoStuff();
      Sleep(30);
    }
  });
}

请注意,我们需要同时捕获同步异常(可能由任务链的第一部分引起的异常)和异步异常(由其中一个任务或其延续引起的异常)。我们通过使用task&lt;int&gt; 类型的延续来捕获异步异常,然后通过在任务上显式调用get() 来检查异常。在这两种情况下,我们都会重置 value_busy_ 标志。

您应该会看到一些输出,如下所示;您可以看出多个线程正在相互竞争,因为有时OutputDebugString 调用会交错:

Doing work...
Work already in progress; bailing.
Work already in progress; bailing.
Work already in progress; bailing.
Work already in progress; bailing.
** Done with work; value is Work already in progress; bailing.
16

【讨论】:

    猜你喜欢
    • 2018-02-28
    • 2013-02-13
    • 1970-01-01
    • 2017-01-18
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多