【问题标题】:If each thread needs to resume work, whenever any thread finds some new information, how to wait until all threads have finished?如果每个线程都需要恢复工作,每当任何一个线程发现一些新信息时,如何等到所有线程都完成?
【发布时间】:2021-12-30 21:03:48
【问题描述】:

一个看似简单的同步问题

TL;DR

多个线程相互依赖。每当他们中的一个人发现一些新信息时,他们都需要处理这些信息。如何确定所有线程都准备好了?

背景

我已经(几乎)并行化了一个函数Foo(input) 来解决一个问题,它被称为P-complete,可能被认为是某种类型的搜索。不出所料,到目前为止,没有人成功地利用两个线程之外的并行性来解决这个问题。然而,我有一个很有前途的想法并设法完全实现它,除了这个看似简单的问题。

详情

每个线程之间的信息是使用某种G 类型的共享图形数据库g 隐式交换的,这样线程就可以立即获得所有信息,并且不需要显式地相互通知。更准确地说,每次某个线程找到i 信息时,该线程都会调用线程安全函数g.addInformation(i),其中基本上将信息i 放在某个数组的末尾。我的新实现的一个方面是,线程可以在搜索过程中使用i 信息,甚至在i 被排入数组末尾之前。然而,每个线程需要在信息i 入队后单独处理该信息。在添加i 的线程从g.addInformation(i) 返回之后,可能会排队i。这是因为其他一些线程可能会接管对i 进行排队的责任。

每个线程s 调用一个函数s.ProcessAllInformation() 以便按顺序处理g 中该数组中的所有信息。某个线程对s.ProcessAllInformation 的调用是noop,即如果该线程已经处理了所有信息或没有(新)信息,则什么也不做。

一旦一个线程处理完所有信息,它应该等待所有其他线程完成。如果任何其他线程发现一些新信息i,它应该会恢复工作。 IE。每次某些线程调用g.addInformation(i) 时,所有已完成处理所有先前已知信息的线程都需要恢复其工作并处理该(以及任何其他)新添加的信息。

我的问题

我能想到的任何解决方案都行不通,并且会遇到相同问题的变体:一个线程完成了所有信息的处理,然后看到所有其他线程也都准备好了。因此,该线程离开。但随后另一个线程注意到添加了一些新信息,恢复工作并找到新信息。然后已经离开的线程不会处理新信息。

这个问题的解决方案可能是直截了当的,但我想不出一个。理想情况下,此问题的解决方案不应依赖于每当找到新信息时对 g.addInformation(i) 的函数调用期间的耗时操作,因为这种情况预计每秒出现多少次(每秒 1 或 2 百万次) ,见下文)。

更多背景

在我最初的顺序应用程序中,函数 Foo(input) 在现代硬件上大约每秒调用 100k 次,我的应用程序花费 80% 到 90% 的时间执行 Foo(input)。实际上,所有对Foo(input) 的函数调用都是相互依赖的,我们以一种迭代的方式在一个非常大的空间中搜索一些东西。使用应用程序的顺序版本时,解决一个合理规模的问题通常需要大约一两个小时。

每次调用Foo(input) 时,都会在零到数百个新信息之间找到。在我的应用程序执行期间,平均每秒发现 1 或 200 万条信息,即我们在对 Foo(input) 的每个函数调用中发现 10 到 20 条新信息。所有这些统计数据可能都有非常高的标准偏差(不过我还没有测量)。

目前我正在为 go 中的Foo(input) 的并行版本编写原型。我更喜欢go中的答案。顺序应用程序是用 C 编写的(实际上它是 C++,但它像 C 中的程序一样编写)。所以 C 或 C++(或伪代码)的答案是没有问题的。我还没有对我的原型进行基准测试,因为错误代码比慢代码慢得多。

代码

此代码示例是为了澄清。由于我还没有解决问题,请随时考虑对代码进行任何更改。 (我也很欣赏无关的有用评论。)

全球形势

我们有一些类型GFoo()G 的一个方法。如果gG 类型的对象,并且当g.Foo(input) 被调用时,g 会创建一些工作人员s[1],...,s[g.numThreads],它们获得指向g 的指针,这样它们具有访问g 的成员变量,并能够在发现新信息时调用g.addInformation(i)。然后为每个工作人员s[j] 并行调用一个方法FooInParallel()

type G struct {
  s           []worker
  numThreads  int

  // some data, that the workers need access to
}

func (g *G) initializeWith(input InputType) {
  // Some code...
}

func (g *G) Foo(input InputType) int {
  // Initialize data-structures:
  g.initializeWith(input)

  // Initialize workers:
  g.s := make([]worker, g.numThreads)
  for j := range g.s {
    g.s[j] := newWorker(g) // workers get a pointer to g
  }

  // Note: This wait group doesn't solve the problem. See remark below.
  wg := new(sync.WaitGroup)
  wg.Add(g.numThreads)
 
  // Actual computation in parallel:
  for j := 0 ; j < g.numThreads - 1 ; j++ {
    // Start g.numThread - 1 go-routines in parrallel
    go g.s[j].FooInParallel(wg)
  }

  // Last thread is this go-routine, such that we have
  // g.numThread go-routines in total.
  g.s[g.numThread-1].FooInParallel(wg)

  wg.Wait()
}

// This function is thread-safe in so far as several
// workers can concurrently add information.
// 
// The function is optimized for heavy contention; most
// threads can leave almost immediately. One threads 
// cleans up any mess they leave behind (and even in 
// bad cases that is not too much).
func (g *G) addInformation(i infoType) {
  // Step 1: Make information available to all threads.
  // Step 2: Enqueue information at the end of some array.
  // Step 3: Possibly, call g.notifyAll()
}

// If a new information has been added, we must ensure, 
// that every thread, that had finished, resumes work 
// and processes any newly added informations. 
func (g *G) notifyAll() {
   // TODO:
   // This is what I fail to accomplish. I include
   // my most successful attempt in the corresponding.
   // section. It doesn't work, though.
}

// If a thread has finished processing all information
// it must ensure that all threads have finished and
// that no new information have been added since.
func (g *G) allThreadsReady() bool {
   // TODO:
   // This is what I fail to accomplish. I include
   // my most successful attempt in the corresponding.
   // section. It doesn't work, though.
}

备注: 等待组的唯一目的是确保在最后一个工作人员返回之前不会再次调用 Foo(input)。但是,您可以完全忽略这一点。

当地情况

每个工作线程都包含一个指向全局数据结构的指针,并搜索宝藏或新信息,直到它处理完所有已被此线程或其他线程排队的信息。如果找到新信息i,它会调用函数g.addInformation(i) 并继续搜索。如果它找到宝藏,它会通过它作为参数获得的通道发送宝藏并返回。如果所有线程都准备好处理所有信息,则每个线程都可以向通道发送一个虚拟宝藏并返回。但是,确定是否所有线程都准备好了正是我的问题。

type worker struct {
  // Each worker contains a pointer to g
  // such that it has access to its member
  // variables and is able to call the
  // function g.addInformation(i) as soon 
  // as it finds some information i.
  g    *G 

  // Also contains some other stuff. 
}

func (s *worker) FooInParallel(wg *sync.WaitGroup) {
  defer wg.Done()
  for {
    a := s.processAllInformation()
    
    // The following is the problem. Feel free to make any 
    // changes to the following block.
    s.notifyAll()
    for !s.needsToResumeWork() {
      if s.allThreadsReady() {
        return
      }
    }

  }
}

func (s *worker) notifyAll() {
  // TODO:
  // This is what I fail to accomplish. I include
  // my most successful attempt in the corresponding.
  // section. It doesn't work, though.

  // An example: 
  // Step 1: Possibly, do something else first.
  // Step 2: Call g.notifyAll()
}

func (s *worker) needsToResumeWork() bool {
  // TODO:
  // This is what I fail to accomplish. I include
  // my most successful attempt in the corresponding.
  // section. It doesn't work, though.
}

func (s *worker) allThreadsReady() bool {
  // TODO:
  // This is what I fail to accomplish. I include
  // my most successful attempt in the corresponding.
  // section. It doesn't work, though.

  // If all threads are ready, return true. 
  // Otherwise, return false.

  // Alternatively, spin as long as no new information
  // has been added, and return false as soon as some
  // new information has been added, or true if no new
  // information has been added and all other threads
  // are ready.
  // 
  // However, this doesn't really matter, because a 
  // function call to processAllInformation is cheap
  // if no new informations are available.
}

// A call to this function is cheap if no new work has
// been added since the last function call.
func (s *worker) processAllInformation() treasureType {
  // Access member variables of g and search
  // for information or treasures. 

  // If a new information i is found, calls the
  // function g.addInformation(i).

  // If all information that have been enqueued to
  // g have been processed by this thread, returns.
}

我解决问题的最佳尝试

嗯,到现在为止,我已经很累了,所以我可能需要稍后再检查我的解决方案。但是,即使我的correct 尝试也不起作用。因此,为了让您了解我迄今为止一直在尝试的事情(以及许多其他事情),我立即分享。

我尝试了以下方法。每个工人都包含一个成员变量needsToResumeWork,只要添加了新信息,它就会自动设置为一个。多次将此成员变量设置为 1 并没有什么坏处,重要的是线程在添加最后一个信息后恢复工作。

为了减少每当找到信息i时调用g.addInformation(i)的线程的工作量,而不是单独通知所有线程,排队信息的线程(不一定是调用g.addInformation(i)的线程) 之后将g的成员变量notifyAllFlag设置为1,表示需要通知所有线程最新信息。

每当处理完所有已入队信息的线程调用函数g.notifyAll() 时,它都会检查成员变量notifyAllFlag 是否设置为1。如果是这样,它会尝试以原子方式将g.allInformedFlag 与 1 进行比较并与 0 进行交换。如果它无法写入 g.allInformedFlag,它会假定某个其他线程已负责通知所有线程。如果此操作成功,则此线程已接管通知所有线程的责任,并通过将成员变量needsToResumeWorkFlag 设置为每个线程一个来继续这样做。然后它自动将g.numThreadsReadyg.notifyAllFlag 设置为零,将g.allInformedFlag 设置为1。

type G struct {
  numThreads       int
  numThreadsReady      *uint32 // initialize to 0 somewhere appropriate
  notifyAllFlag        *uint32 // initialize to 0 somewhere appropriate
  allInformedFlag      *uint32 // initialize to 1 somewhere appropriate (1 is not a typo)

  // some data, that the workers need access to
}

// This function is thread-safe in so far as several
// workers can concurrently add information.
// 
// The function is optimized for heavy contention; most
// threads can leave almost immediately. One threads 
// cleans up any mess they leave behind (and even in 
// bad cases that is not too much).
func (g *G) addInformation(i infoType) {
  // Step 1: Make information available to all threads.
  // Step 2: Enqueue information at the end of some array.

  // Since the responsibility to enqueue an information may
  // be passed to another thread, it is important that the
  // last step is executed by the thread which enqueues the 
  // information(s) in order to ensure, that the information
  // successfully has been enqueued.

  // Step 3:
  atomic.StoreUint32(g.notifyAllFlag,1)        // all threads need to be notified
}

// If a new information has been added, we must ensure, 
// that every thread, that had finished, resumes work 
// and processes any newly added informations. 
func (g *G) notifyAll() {
  if atomic.LoadUint32(g.notifyAll) == 1 {
    // Somebody needs to notify all threads.
    if atomic.CompareAndSwapUint32(g.allInformedFlag, 1, 0) {
      // This thread has taken over the responsibility to inform
      // all other threads. All threads are hindered to access 
      // their member variable s.needsToResumeWorkFlag
      for j := range g.s {
        atomic.StoreUint32(g.s[j].needsToResumeWorkFlag, 1)
      }
      atomic.StoreUint32(g.notifyAllFlag, 0)
      atomic.StoreUint32(g.numThreadsReady, 0)
      atomic.StoreUint32(g.allInformedFlag, 1)
    } else {
      // Some other thread has taken responsibility to inform
      // all threads. 
  }
}

当一个线程完成处理所有已入队的信息时,它会通过将其成员变量needsToResumeWorkFlag 与 1 和 0 交换进行原子比较来检查是否需要恢复工作。但是,由于其中一个线程负责通知所有其他人,它不能立即这样做。

首先,它必须调用函数g.notifyAll(),然后它必须检查最近调用g.notifyAll()的线程是否完成通知所有线程。因此,在调用g.notifyAll() 之后,它必须旋转直到g.allInformed 为1,然后才检查其成员变量s.needsToResumeWorkFlag 是否为1,并且在这种情况下以原子方式将其设置为零并恢复工作。 (我想这是一个错误,但我也在这里尝试了其他几件事但没有成功。)如果s.needsToResumeWorkFlag 已经为零,它会自动将g.numThreadsReady 增加一,如果它以前没有这样做的话。 (回想一下,g.numThreadsReady 在对g.notifyAll() 的函数调用期间被重置。)然后它自动检查g.numThreadsReady 是否等于g.numThreads,在这种情况下它可以离开(在向通道发送虚拟宝藏之后) .否则我们重新开始,直到这个线程被通知(可能由它自己)或所有线程都准备好。

type worker struct {
  // Each worker contains a pointer to g
  // such that it has access to its member
  // variables and is able to call the
  // function g.addInformation(i) as soon 
  // as it finds some information i.
  g    *G 

  // If new work has been added, the thread
  // is notified by setting the uint32 
  // at which needsToResumeWorkFlag points to 1.
  needsToResumeWorkFlag *uint32 // initialize to 0 somewhere appropriate

  // Also contains some other stuff. 
}

func (s *worker) FooInParallel(wg *sync.WaitGroup) {
  defer wg.Done()
  for {
    a := s.processAllInformation()

    numReadyIncremented := false
    for !s.needsToResumeWork() {
      if !numReadyIncremented {
        atomic.AddUint32(g.numThreadsReady,1)
        numReadyIncremented = true
      }
      if s.allThreadsReady() {
        return
      }
    }

  }
}

func (s *worker) needsToResumeWork() bool {
  s.notifyAll()
  for {
    if atomic.LoadUint32(g.allInformedFlag) == 1 {
      if atomic.CompareAndSwapUint32(s.needsToResumeWorkFlag, 1, 0) {
        return true
      } else {
        return false
      }
    }
  }
}

func (s *worker) notifyAll() {
  g.notifyAll()
}

func (g *G) allThreadsReady() bool {
  if atomic.LoadUint32(g.numThreadsReady) == g.numThreads {
    return true
  } else {
    return false
  }
}

如前所述,我的解决方案不起作用。

【问题讨论】:

  • (我也很欣赏无关的有用评论。) — 我的建议是继续努力,尽量减少这个例子。您显然在写这个问题时付出了很多努力,但是它可能只会达到相反的结果并将回答者赶走,前提是它不会完全吸引反对票或接近票。通过将问题简化为一个最小的示例,您自己也可能获得更多的清晰度。谢谢。
  • @blackgreen 谢谢。我将尝试将其简化为一个最小的示例,但有几天我会保持这样的状态。我觉得我需要先拉一段距离。
  • 你考虑过sync.Cond吗?我试着理解,但这里有很多东西,功能约束和性能约束交织在一起,让我难以形成一个有凝聚力的画面,那就是我的粥脑。最好列出解决方案中所需的具体设计要求。
  • 我理解提供尽可能多信息的冲动,但我同意 blackgreen 在这种情况下您提供太多的信息,特别是有太多细节似乎与实际问题无关。 FWIW:我不认为在 go 中实现原型是有帮助的,因为 go 有一个非常不同的并发模型,所以很有可能你实现的任何东西都不能轻易地转移到 C++。相反,请尝试提出一个简化的最小示例,以维护您实际问题的重要方面。
  • FWIW:如果您使用的是 C++20,我认为将单个 std::atomic 与等待/通知一起使用可能就足够了,但我需要先更好地理解问题。跨度>

标签: multithreading go concurrency parallel-processing lock-free


【解决方案1】:

我自己找到了解决方案。我们利用,如果没有添加新信息(而且很便宜),对s.processAllInformation() 的调用不会执行任何操作。诀窍是使用原子变量作为两者的锁,以便每个线程在必要时通知所有线程并检查它是否已被通知。如果无法获得锁,则只需再次调用s.processAllInformation()。然后一个线程使用通知来检查它是否必须增加就绪线程的计数器,而不是查看它是否需要返回工作。

全球形势

type G struct {
  numThreads           int
  numThreadsReady      *uint32 // initialize to 0 somewhere appropriate
  notifyAllFlag        *uint32 // initialize to 0 somewhere appropriate
  allCanGoFlag         *uint32 // initialize to 0 somewhere appropriate
  lock                 *uint32 // initialize to 0 somewhere appropriate

  // some data, that the workers need access to
}

// This function is thread-safe in so far as several
// workers can concurrently add information.
// 
// The function is optimized for heavy contention; most
// threads can leave almost immediately. One threads 
// cleans up any mess they leave behind (and even in 
// bad cases that is not too much).
func (g *G) addInformation(i infoType) {
  // Step 1: Make information available to all threads.
  // Step 2: Enqueue information at the end of some array.

  // Since the responsibility to enqueue an information may
  // be passed to another thread, it is important that the
  // last step is executed by the thread which enqueues the 
  // information(s) in order to ensure, that the information
  // successfully has been enqueued.

  // Step 3:
  atomic.StoreUint32(g.notifyAllFlag,1)        // all threads need to be notified
}

// If a new information has been added, we must ensure, 
// that every thread, that had finished, resumes work 
// and processes any newly added informations. 
//
// This function is not thread-safe. Make sure not to 
// have several threads call this function concurrently
// if these calls are not guarded by some lock.
func (g *G) notifyAll() {
  if atomic.LoadUint32(g.notifyAllFlag,1) {    
    for j := range g.s {
      atomic.StoreUint32(g.s[j].needsToResumeWorkFlag, 1)
    }
    atomic.StoreUint32(g.notifyAllFlag,0)
    atomic.StoreUint32(g.numThreadsReady,0)
}

当地情况

type worker struct {
  // Each worker contains a pointer to g
  // such that it has access to its member
  // variables and is able to call the
  // function g.addInformation(i) as soon 
  // as it finds some information i.
  g    *G 

  // If new work has been added, the thread
  // is notified by setting the uint32 
  // at which needsToResumeWorkFlag points to 1.
  needsToResumeWorkFlag *uint32 // initialize to 0 somewhere appropriate

  incrementedNumReadyFlag *uint32 // initialize to 0 somewhere appropriate

  // Also contains some other stuff. 
}

func (s *worker) FooInParallel(wg *sync.WaitGroup) {
  defer wg.Done()
  for {
    a := s.processAllInformation()

    if atomic.LoadUint32(s.g.allCanGoFlag, 1) {
      return
    }

    if atomic.CompareAndSwapUint32(g.lock,0,1) { // If possible, lock.
      s.g.notifyAll() // It is essential, that this is also guarded by the lock.

      if atomic.LoadUint32(s.needsToResumeWorkFlag) == 1 {
        atomic.StoreUint32(s.needsToResumeWorkFlag,0)

        // Some new information was found, and this thread can't be sure,
        // whether it already has processed it. Since the counter for
        // how many threads are ready had been reset, we must increment
        // that counter after the next call processAllInformation() in the 
        // following iteration.
        atomic.StoreUint32(s.incrementedNumReadyFlag,0)

      } else {

        // Increment number of ready threads by one, if this thread had not 
        // done this before (since the last newly found information).
        if atomic.CompareAndSwapUint32(s.incrementedNumReadyFlag,0,1) {
          atomic.AddUint32(s.g.numThreadsReady,1)
        }

        // If all threads are ready, give them all a signal.
        if atomic.LoadUint32(s.g.numThreadsReady) == s.g.numThreads {
          atomic.StoreUint32(s.g.allCanGo, 1)
        }

      }

      atomic.StoreUint32(g.lock,0) // Unlock.
    }

  }
}

稍后我可能会为线程在激烈争用下访问锁添加一些顺序,但现在就可以了。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2015-10-04
    • 2012-07-02
    • 2016-05-16
    • 2023-03-29
    • 1970-01-01
    • 2019-06-22
    • 1970-01-01
    相关资源
    最近更新 更多