【发布时间】:2021-09-27 22:16:00
【问题描述】:
我已经搜索了很多,但还没有找到我的问题的答案。
我需要多次调用外部 API,但同时使用不同的参数。 然后对于每个调用,我需要为每个数据集初始化一个结构并处理我从 API 调用接收到的数据。请记住,我读取了传入请求的每一行并立即开始将其发送到通道。
我遇到的第一个问题一开始并不明显,因为我接收的数据量很大,是每个 goroutine 并没有接收到所有通过通道的数据。 (我通过我所做的研究了解到)。所以我需要一种将数据重新排队/重定向到正确 goroutine 的方法。
从单个数据集发送流式响应的函数。 (我已经剪掉了无用的代码部分)
func (api *API) RequestData(ctx context.Context, c chan DWeatherResponse, dataset string, wg *sync.WaitGroup) error {
for {
line, err := reader.ReadBytes('\n')
s := string(line)
if err != nil {
log.Println("End of %s", dataset)
return err
}
data, err := extractDataFromStreamLine(s, dataset)
if err != nil {
continue
}
c <- *data
}
}
处理传入数据的函数
func (s *StrikeStruct) Process(ch, requeue chan dweather.DWeatherResponse) {
for {
data, more := <-ch
if !more {
break
}
// data contains {dataset string, value float64, date time.Time}
// The s.Parameter needs to match the dataset
// IMPORTANT PART, checks if the received data is part of this struct dataset
// If not I want to send it to another go routine until it gets to the correct
one. There will be a max of 4 datasets but still this could not be the best approach to have
if !api.GetDataset(s.Parameter, data.Dataset) {
requeue <- data
continue
}
// Do stuff with the data from this point
}
}
现在在我自己的 API 端点上,我有以下内容:
ch := make(chan dweather.DWeatherResponse, 2)
requeue := make(chan dweather.DWeatherResponse)
final := make(chan strike.StrikePerYearResponse)
var wg sync.WaitGroup
for _, s := range args.Parameters.Strikes {
strike := strike.StrikePerYear{
Parameter: strike.Parameter(s.Dataset),
StrikeValue: s.Value,
}
// I receive and process the data in here
go strike.ProcessStrikePerYear(ch, requeue, final, string(s.Dataset))
}
go func() {
for {
data, _ := <-requeue
ch <- data
}
}()
// Creates a goroutine for each dataset
for _, dataset := range api.Params.Dataset {
wg.Add(1)
go api.RequestData(ctx, ch, dataset, &wg)
}
wg.Wait()
close(ch)
//Once the data is all processed it is all appended
var strikes []strike.StrikePerYearResponse
for range args.Fetch.Datasets {
strikes = append(strikes, <-final)
}
return strikes
这段代码的问题是,一旦我开始从多个端点接收数据,requeue 就会阻塞,不会再发生任何事情。如果我删除了 requeue 逻辑数据,如果它没有落在正确的 goroutine 上,它将丢失。
我的两个问题是:
- 如果有一个总是准备好接收的 goroutine,为什么 requeue 会阻塞?
- 我是否应该采用不同的方法来处理传入的数据?
【问题讨论】:
-
听起来你应该让每个 goroutine 有一个单独的通道。尝试在单个共享通道上重新排队将是困难且低效的。
-
将您提取的所有结果放入一个负责将数据分配给其相应数据集的例程中。或者,预先确保正确的例程执行预期的请求以处理预期的结果。无论如何,重新排队听起来不对。
-
好的,非常感谢。我第一次使用并发并开始以这种方式构建代码,认为所有例程都会从通道接收相同的数据。我会试试你的建议
标签: go concurrency channel