【问题标题】:Why is data being pushed into the channel but never read from the receiver goroutine?为什么数据被推入通道但从未从接收器 goroutine 中读取?
【发布时间】:2021-12-05 17:48:20
【问题描述】:

我正在构建一个守护进程,我有两个服务将相互发送数据。服务 A 产生数据,服务 B a 是数据缓冲区服务或类似队列。因此,从main.go 文件中,服务 B 被实例化并启动。 Start() 方法将 buffer() 函数作为 goroutine 执行,因为此函数等待数据传递到通道上,我不希望主进程停止等待 buffer 完成。然后服务 A 被实例化并启动。然后它也被“注册”到服务 B。

我为服务 A 创建了一个名为 RegisterWithBufferService 的方法,该方法创建了两个新通道。它将这些通道存储为它自己的属性,并将它们提供给服务 B。

func (s *ServiceA) RegisterWithBufferService(bufService *data.DataBuffer) error {
    newIncomingChan := make(chan *data.DataFrame, 1)
    newOutgoingChan := make(chan []byte, 1)
    s.IncomingBuffChan = newIncomingChan
    s.OutgoingDataChannels = append(s.OutgoingDataChannels, newOutgoingChan)
    bufService.DataProviders[s.ServiceName()] = data.DataProviderInfo{
        IncomingChan: newOutgoingChan, //our outGoing channel is their incoming
        OutgoingChan: newIncomingChan, // our incoming channel is their outgoing
    }
    s.DataBufferService = bufService
    bufService.NewProvider <- s.ServiceName() //The DataBuffer service listens for new services and creates a new goroutine for buffering
    s.Logger.Info().Msg("Registeration completed.")
    return nil
}

Buffer 主要监听来自服务 A 的传入数据,使用 Decode() 对其进行解码,然后将其添加到名为 buf 的切片中。如果分片的长度大于bufferPeriod,那么它将把 Outgoing 通道中分片中的第一项发送回服务 A。

func (b* DataBuffer) buffer(bufferPeriod int) {
    for {
        select {
        case newProvider := <- b.NewProvider:
            b.wg.Add(1)
            /*
            newProvider is a string
            DataProviders is a map the value it returns is a struct containing the Incoming and 
            Outgoing channels for this service
            */
            p := b.DataProviders[newProvider]
            go func(prov string, in chan []byte, out chan *DataFrame) {
                defer b.wg.Done()
                var buf []*DataFrame
                for {
                    select {
                    case rawData := <-in:
                        tmp := Decode(rawData) //custom decoding function. Returns a *DataFrame
                        buf = append(buf, tmp)
                        if len(buf) < bufferPeriod {
                            b.Logger.Info().Msg("Sending decoded data out.")
                            out <- buf[0]
                            buf = buf[1:] //pop
                        }
                    case <- b.Quit:
                        return
                    }
                }
            }(newProvider, p.IncomingChan, p.OutgoingChan)
        }
    case <- b.Quit:
        return
    }
}

现在服务 A 有一个名为 record 的方法,它会定期将数据推送到其 OutgoingDataChannels 属性中的所有通道。

func (s *ServiceA) record() error {
    ...
    if atomic.LoadInt32(&s.Listeners) != 0 {
        s.Logger.Info().Msg("Sending raw data to data buffer")
        for _, outChan := range s.OutgoingDataChannels {
            outChan <- dataBytes // the receiver (Service B) is already listening and this doesn't hang
        }
        s.Logger.Info().Msg("Raw data sent and received") // The logger will output this so I know it's not hanging 
    }
}

问题是服务 A 似乎使用 record 成功推送数据,但服务 B 从未进入 buffer 子 goroutine 中的 case rawData := &lt;-in: 案例。这是因为我有嵌套的 goroutines 吗?如果不清楚,当服务 B 启动时,它会调用 buffer 但因为它会挂起,所以我调用了 buffer 一个 goroutine。因此,当服务 A 调用 RegisterWithBufferService 时,buffer goroutine 会创建一个 goroutine 来侦听来自服务 B 的新数据,并在缓冲区填满后将其推送回服务 A。我希望我解释清楚了。

编辑 1 我做了一个最小的、可重复的例子。

package main

import (
    "fmt"
    "sync"
    "sync/atomic"
    "time"
)

var (
    defaultBufferingPeriod int = 3
    DefaultPollingInterval int64 = 10
)

type DataObject struct{
    Data    string
}

type DataProvider interface {
    RegisterWithBufferService(*DataBuffer) error
    ServiceName() string
}

type DataProviderInfo struct{
    IncomingChan    chan *DataObject
    OutgoingChan    chan *DataObject
}

type DataBuffer struct{
    Running         int32 //used atomically
    DataProviders   map[string]DataProviderInfo
    Quit            chan struct{}
    NewProvider     chan string
    wg              sync.WaitGroup
}

func NewDataBuffer() *DataBuffer{
    var (
        wg sync.WaitGroup
    )
    return &DataBuffer{
        DataProviders: make(map[string]DataProviderInfo),
        Quit: make(chan struct{}),
        NewProvider: make(chan string),
        wg: wg,
    }
}

func (b *DataBuffer) Start() error {
    if ok := atomic.CompareAndSwapInt32(&b.Running, 0, 1); !ok {
        return fmt.Errorf("Could not start Data Buffer Service.")
    }
    go b.buffer(defaultBufferingPeriod)
    return nil
}

func (b *DataBuffer) Stop() error {
    if ok := atomic.CompareAndSwapInt32(&b.Running, 1, 0); !ok {
        return fmt.Errorf("Could not stop Data Buffer Service.")
    }
    for _, p := range b.DataProviders {
        close(p.IncomingChan)
        close(p.OutgoingChan)
    }
    close(b.Quit)
    b.wg.Wait()
    return nil
}

// buffer creates goroutines for each incoming, outgoing data pair and decodes the incoming bytes into outgoing DataFrames
func (b *DataBuffer) buffer(bufferPeriod int) {
    for {
        select {
        case newProvider := <- b.NewProvider:
            fmt.Println("Received new Data provider.")
            if _, ok := b.DataProviders[newProvider]; ok { 
                b.wg.Add(1)
                p := b.DataProviders[newProvider]
                go func(prov string, in chan *DataObject, out chan *DataObject) {
                    defer b.wg.Done()
                    var (
                        buf []*DataObject
                    )
                    fmt.Printf("Waiting for data from: %s\n", prov)
                    for {
                        select {
                        case inData := <-in:
                            fmt.Printf("Received data from: %s\n", prov)
                            buf = append(buf, inData)
                            if len(buf) > bufferPeriod {
                                fmt.Printf("Queue is filled, sending data back to %s\n", prov)
                                out <- buf[0]
                                fmt.Println("Data Sent")
                                buf = buf[1:] //pop
                            }
                        case <- b.Quit:
                            return
                        }
                    }
                }(newProvider, p.IncomingChan, p.OutgoingChan)
            }
        case <- b.Quit:
            return
        }
    }
}

type ServiceA struct{
    Active                  int32 // atomic
    Stopping                int32 // atomic
    Recording               int32 // atomic
    Listeners               int32 // atomic
    name                    string
    QuitChan                chan struct{}
    IncomingBuffChan        chan *DataObject
    OutgoingBuffChans       []chan *DataObject
    DataBufferService       *DataBuffer
}

// A compile time check to ensure ServiceA fully implements the DataProvider interface
var _ DataProvider = (*ServiceA)(nil)

func NewServiceA() (*ServiceA, error) {
    var newSliceOutChans []chan *DataObject
    return &ServiceA{
        QuitChan:  make(chan struct{}),
        OutgoingBuffChans: newSliceOutChans,
        name:   "SERVICEA",
    }, nil
}

// Start starts the service. Returns an error if any issues occur
func (s *ServiceA) Start() error {
    atomic.StoreInt32(&s.Active, 1)
    return nil
}

// Stop stops the service. Returns an error if any issues occur
func (s *ServiceA) Stop() error {
    atomic.StoreInt32(&s.Stopping, 1)
    close(s.QuitChan)
    return nil
}

func (s *ServiceA) StartRecording(pol_int int64) error {
    if ok := atomic.CompareAndSwapInt32(&s.Recording, 0, 1); !ok {
        return fmt.Errorf("Could not start recording. Data recording already started")
    }
    ticker := time.NewTicker(time.Duration(pol_int) * time.Second)
    go func() {
        for {
            select {
            case <-ticker.C:
                fmt.Println("Time to record...")
                err := s.record()
                if err != nil {
                    return
                }
            case <-s.QuitChan:
                ticker.Stop()
                return
            }
        }
    }()
    return nil
}

func (s *ServiceA) record() error {
    current_time := time.Now()
    ct := fmt.Sprintf("%02d-%02d-%d", current_time.Day(), current_time.Month(), current_time.Year())
    dataObject := &DataObject{
        Data: ct,
    }
    if atomic.LoadInt32(&s.Listeners) != 0 {
        fmt.Println("Sending data to Data buffer...")
        for _, outChan := range s.OutgoingBuffChans {
            outChan <- dataObject // the receivers should already be listening
        }
        fmt.Println("Data sent.")
    }
    return nil
}

// RegisterWithBufferService satisfies the DataProvider interface. It provides the bufService with new incoming and outgoing channels along with a polling interval
func (s ServiceA) RegisterWithBufferService(bufService *DataBuffer) error {
    if _, ok := bufService.DataProviders[s.ServiceName()]; ok {
        return fmt.Errorf("%v data provider already registered with Data Buffer.", s.ServiceName())
    }
    newIncomingChan := make(chan *DataObject, 1)
    newOutgoingChan := make(chan *DataObject, 1)
    s.IncomingBuffChan = newIncomingChan
    s.OutgoingBuffChans = append(s.OutgoingBuffChans, newOutgoingChan)
    bufService.DataProviders[s.ServiceName()] = DataProviderInfo{
        IncomingChan: newOutgoingChan, //our outGoing channel is their incoming
        OutgoingChan: newIncomingChan, // our incoming channel is their outgoing
    }
    s.DataBufferService = bufService
    bufService.NewProvider <- s.ServiceName() //The DataBuffer service listens for new services and creates a new goroutine for buffering
    return nil
}

// ServiceName satisfies the DataProvider interface. It returns the name of the service.
func (s ServiceA) ServiceName() string {
    return s.name
}

func main() {
    var BufferedServices []DataProvider
    fmt.Println("Instantiating and Starting Data Buffer Service...")
    bufService := NewDataBuffer()
    err := bufService.Start()
    if err != nil {
        panic(fmt.Sprintf("%v", err))
    }
    defer bufService.Stop()
    fmt.Println("Data Buffer Service successfully started.")

    fmt.Println("Instantiating and Starting Service A...")
    serviceA, err := NewServiceA()
    if err != nil {
        panic(fmt.Sprintf("%v", err))
    }
    BufferedServices = append(BufferedServices, *serviceA)
    err = serviceA.Start()
    if err != nil {
        panic(fmt.Sprintf("%v", err))
    }
    defer serviceA.Stop()
    fmt.Println("Service A successfully started.")

    fmt.Println("Registering services with Data Buffer...")
    for _, s := range BufferedServices {
        _ = s.RegisterWithBufferService(bufService) // ignoring error msgs for base case
    }
    fmt.Println("Registration complete.")

    fmt.Println("Beginning recording...")
    _ = atomic.AddInt32(&serviceA.Listeners, 1)
    err = serviceA.StartRecording(DefaultPollingInterval)
    if err != nil {
        panic(fmt.Sprintf("%v", err))
    }
    for {
        select {
        case RTD := <-serviceA.IncomingBuffChan:
            fmt.Println(RTD)
        case <-serviceA.QuitChan:
            atomic.StoreInt32(&serviceA.Listeners, 0)
            bufService.Quit<-struct{}{}
        }
    }
}

在 Go 1.17 上运行。运行示例时,它应该每 10 秒打印一次:

Time to record...
Sending data to Data buffer...
Data sent.

但是数据缓冲区永远不会进入inData := &lt;-in 的情况。

【问题讨论】:

  • 您能提供minimal , reproducible example吗?我可以在您提供的内容中看到一些潜在的竞争条件(例如bufService.DataProviders),并且您的缓冲算法看起来很可疑(为什么len(buf) &lt; bufferPeriod - 我希望这会触发每次迭代,或者永远不会触发bufferPeriod==0?)但是没有完整的例子,很难进一步评论。
  • 当然,我会这样做并在 OP 中标记一个编辑。
  • @Brits 我做了一个最小的、可重复的例子。顺便说一句,您对len(buf) &lt; bufferPeriod 的看法是正确的,它应该是len(buf) &gt; bufferPeriod,因此它将切片中的第一项传递到传出通道。感谢您指出这一点。

标签: go channel goroutine


【解决方案1】:

为了诊断这个问题,我将fmt.Println("Sending data to Data buffer...") 更改为fmt.Println("Sending data to Data buffer...", s.OutgoingBuffChans),输出为:

Time to record...
Sending data to Data buffer... []

因此,您实际上并没有将数据发送到任何渠道。这样做的原因是:

func (s ServiceA) RegisterWithBufferService(bufService *DataBuffer) error {

当您执行s.OutgoingBuffChans = append(s.OutgoingBuffChans, newOutgoingChan) 时,接收者不是指针,您正在更改s.OutgoingBuffChans 的副本中的ServiceA,该副本在函数退出时被丢弃。要修复此更改:

func (s ServiceA) RegisterWithBufferService(bufService *DataBuffer) error {

func (s *ServiceA) RegisterWithBufferService(bufService *DataBuffer) error {

BufferedServices = append(BufferedServices, *serviceA)

BufferedServices = append(BufferedServices, serviceA)

修改后的版本输出:

Time to record...
Sending data to Data buffer... [0xc0000d8060]
Data sent.
Received data from: SERVICEA
Time to record...
Sending data to Data buffer... [0xc0000d8060]
Data sent.
Received data from: SERVICEA

所以这解决了报告的问题(如果还有其他问题,我不会感到惊讶,但希望这会为您指明正确的方向)。我确实注意到您最初发布的代码确实使用了指针接收器,因此可能遇到了另一个问题(但在这种情况下很难评论代码片段)。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2020-06-26
    • 2016-03-16
    • 2014-10-10
    • 1970-01-01
    • 2022-01-02
    • 2019-11-18
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多