【问题标题】:Go - wait for next item in a priority queue if emptyGo - 如果为空,则等待优先队列中的下一个项目
【发布时间】:2015-09-12 15:08:45
【问题描述】:

我正在尝试实现一个优先级队列,以通过基于优先级的网络套接字发送 json 对象。我正在使用container/heap 包来实现队列。我想出了这样的事情:

for {
    if pq.Len() > 0 {
        item := heap.Pop(&pq).(*Item)
        jsonEncoder.Encode(&item)
    } else {
        time.Sleep(10 * time.Millisecond)
    }
}

有没有比轮询优先级队列更好的方法来等待新项目?

【问题讨论】:

  • 也许使用频道更适合:gobyexample.com/channels
  • 我没有给出这个答案,因为我无法拿出一段代码来证明它,但是在你的实现中使用通道来发出信号,当一个项目被推入队列时呢?然后,您可以让队列的用户在频道上等待,然后当您在频道上收到信号时退出队列。
  • @LeoCorrea 我正在写这样的答案。这是一个嵌入堆并添加了newItem 通道的结构。然后我意识到你也可以摆脱那些多余的废话并直接使用频道。为什么要告诉某人“现在你可以弹出”而不是通过通道发送它来自动弹出项目?还内置了阻塞机制,因此您不需要这些睡眠或等待或其他任何东西。
  • @evanmcdonnal 那么您如何为该实现设置优先级?
  • @LeoCorrea 哦,是的,我知道我在想什么。我最初的设计是他想要的,尽管我实际上并不喜欢那样。我认为在频道上发信号告诉某人它可以弹出队列是一种黑客行为。它使用推/拉机制来完成拉动的目标。相反,我认为一个更好的主意是颠倒事情并推送,维护一个有序列表,并将您想要出列的项目推送到使用它的任何代码。

标签: go heap priority-queue


【解决方案1】:

我可能会使用几个队列 goroutine。从PriorityQueue example 中的数据结构开始,我会构建一个这样的函数:

http://play.golang.org/p/hcNFX8ehBW

func queue(in <-chan *Item, out chan<- *Item) {
    // Make us a queue!
    pq := make(PriorityQueue, 0)
    heap.Init(&pq)

    var currentItem *Item       // Our item "in hand"
    var currentIn = in          // Current input channel (may be nil sometimes)
    var currentOut chan<- *Item // Current output channel (starts nil until we have something)

    defer close(out)

    for {
        select {
        // Read from the input
        case item, ok := <-currentIn:
            if !ok {
                // The input has been closed. Don't keep trying to read it
                currentIn = nil
                // If there's nothing pending to write, we're done
                if currentItem == nil {
                    return
                }
                continue
            }

            // Were we holding something to write? Put it back.
            if currentItem != nil {
                heap.Push(&pq, currentItem)
            }

            // Put our new thing on the queue
            heap.Push(&pq, item)

            // Turn on the output queue if it's not turned on
            currentOut = out

            // Grab our best item. We know there's at least one. We just put it there.
            currentItem = heap.Pop(&pq).(*Item)

            // Write to the output
        case currentOut <- currentItem:
            // OK, we wrote. Is there anything else?
            if len(pq) > 0 {
                // Hold onto it for next time
                currentItem = heap.Pop(&pq).(*Item)
            } else {
                // Oh well, nothing to write. Is the input stream done?
                if currentIn == nil {
                    // Then we're done
                    return
                }

                // Otherwise, turn off the output stream for now.
                currentItem = nil
                currentOut = nil
            }
        }
    }
}

这是一个使用它的例子:

func main() {
    // Some items and their priorities.
    items := map[string]int{
        "banana": 3, "apple": 2, "pear": 4,
    }

    in := make(chan *Item, 10) // Big input buffer and unbuffered output should give best sort ordering.
    out := make(chan *Item)    // But the system will "work" for any particular values

    // Start the queuing engine!
    go queue(in, out)

    // Stick some stuff on in another goroutine
    go func() {
        i := 0
        for value, priority := range items {
            in <- &Item{
                value:    value,
                priority: priority,
                index:    i,
            }
            i++
        }
        close(in)
    }()

    // Read the results
    for item := range out {
        fmt.Printf("%.2d:%s ", item.priority, item.value)
    }
    fmt.Println()
}

请注意,如果您运行此示例,则每次的顺序都会略有不同。这当然是意料之中的。这取决于输入和输出通道的运行速度。

【讨论】:

    【解决方案2】:

    一种方法是使用sync.Cond:

    Cond 实现了一个条件变量,一个等待或宣布事件发生的 goroutines 的集合点。

    可以将包中的一个示例修改如下(针对消费者):

    c.L.Lock()
    for heap.Len() == 0 {
        c.Wait() // Will wait until signalled by pushing routine
    }
    item := heap.Pop(&pq).(*Item)
    c.L.Unlock()
    // Do stuff with the item
    

    制作人可以简单地做:

    c.L.Lock()
    heap.Push(x)
    c.L.Unlock()
    c.Signal()
    

    (将这些包装在函数中并使用延迟可能是个好主意。)

    这是一个线程安全(朴素)堆的示例,pop 方法会等待直到项目可用:

    package main
    
    import (
        "fmt"
        "sort"
        "sync"
        "time"
        "math/rand"
    )
    
    type Heap struct {
        b []int
        c *sync.Cond
    }
    
    func NewHeap() *Heap {
        return &Heap{c: sync.NewCond(new(sync.Mutex))}
    }
    
    // Pop (waits until anything available)
    func (h *Heap) Pop() int {
        h.c.L.Lock()
        defer h.c.L.Unlock()
        for len(h.b) == 0 {
            h.c.Wait()
        }
        // There is definitely something in there
        x := h.b[len(h.b)-1]
        h.b = h.b[:len(h.b)-1]
        return x
    }
    
    func (h *Heap) Push(x int) {
        defer h.c.Signal() // will wake up a popper
        h.c.L.Lock()
        defer h.c.L.Unlock()
        // Add and sort to maintain priority (not really how the heap works)
        h.b = append(h.b, x)
        sort.Ints(h.b)
    }
    
    func main() {
        heap := NewHeap()
    
        go func() {
            for range time.Tick(time.Second) {
                for n := 0; n < 3; n++ {
                    x := rand.Intn(100)
                    fmt.Println("push:", x)
                    heap.Push(x)
                }
            }
        }()
    
        for {
            item := heap.Pop()
            fmt.Println("pop: ", item)
        }
    }
    

    (请注意,由于for range time.Tick 循环,这在操场上不起作用。在本地运行它。)

    【讨论】:

    • 您的代码似乎无法在 play.golang.org 上运行。
    • 您是正确的,经过编辑的答案并带有评论。由于无限的for range time.Tick 循环和游乐场处理时间的方式(请参阅blog.golang.org/playground),它在游乐场上不起作用。这个简单的例子也无法运行:play.golang.org/p/2LXDXdEBxu
    猜你喜欢
    • 2011-10-31
    • 2012-02-24
    • 2012-11-26
    • 1970-01-01
    • 1970-01-01
    • 2012-03-06
    • 2011-01-18
    • 1970-01-01
    相关资源
    最近更新 更多