【问题标题】:Go routine:Making concurrent API requestsGo 例程:发出并发 API 请求
【发布时间】:2018-01-02 10:06:41
【问题描述】:

我正在尝试了解通道和 goroutine,并尝试编写一个 goroutine 来向服务器发出并发 API 请求

但是,当我使用 goroutine 运行代码时,它所花费的时间似乎与不使用 goroutine 的时间相同。

func sendUser(user string, ch chan<- string)  {
    resp,err := http.get("URL"/user)
    //do the processing and get resp=string
    ch <- resp
}


func AsyncHTTP(users []string) ([]string, error) {
    ch := make(chan string)
    var responses []string
    var user string

    for _ , user = range users {
        go sendUser(user, ch)

        for {
            select {
            case r := <-ch:
                if r.err != nil {
                    fmt.Println(r.err)
                }
                responses = append(responses, r)
                **//Is there a better way to show that the processing of response is complete**?
                if len(responses) == len(users) { 
                    return responses, nil
                }
            case <-time.After(50 * time.Millisecond):
                fmt.Printf(".")
            }
        }
    }
    return responses, nil
}

问题:

  1. 即使我使用了 goroutine,请求完成时间与没有 goroutine 时一样吗?我在 goroutines 上做错了什么吗?

  2. 为了告诉工作不要再在这里等待,我正在使用:

    if len(responses) == len(users)
    

    有没有更好的方法来表明响应的处理已经完成并告诉 ch 不要再等待了?

  3. 什么是wait.Syncgroup?如何在我的 goroutine 中使用它?

【问题讨论】:

  • 您正在使用无缓冲通道。用make(chan string, 10) 或任何你想缓冲的数量来制作你的chan。
  • 其实这不是问题。在您的循环中,您正在调用go sendUser,但随后立即在循环中等待它。 for 先循环所有的 go sendUser 调用,然后再循环那里的其余逻辑。
  • @RayfenWindspear 但这不会导致死锁,因为通道只有 1 个缓冲区,如果我将“select”语句放在循环之外,那么一个缓冲区将如何保存来自的所有响应请求?
  • 这就是它的美妙之处,频道不需要持有任何东西。一个无缓冲的通道基本上什么都没有,它直接将它传递给等待接收它的例程。您不在乎是否所有 100 或 1000 个 goroutine 都阻塞了 ch &lt;- resp。他们已经完成了花费最多时间的 io 部分(http.get)。现在您可以一次使用一个响应。或者,如果需要,您可以使用更多 goroutine 来更快地使用响应。
  • @RayfenWindspear“或者如果你愿意,你可以使用更多的 goroutine 来更快地使用响应”。你能再解释一下吗?我怎样才能实现它?有没有更好的方法而不是 if len(responses) == len(users)

标签: go concurrency channel goroutine


【解决方案1】:

我可能会做这样的事情..

func sendUser(user string, ch chan<- string, wg *sync.WaitGroup) {
    defer wg.Done()
    resp, err := http.Get("URL/" + user)
    if err != nil {
        log.Println("err handle it")
    }
    defer resp.Body.Close()
    b, err := ioutil.ReadAll(resp.Body)
    if err != nil {
        log.Println("err handle it")
    }
    ch <- string(b)
}

func AsyncHTTP(users []string) ([]string, error) {
    ch := make(chan string)
    var responses []string
    var user string
    var wg sync.WaitGroup
    for _, user = range users {
        wg.Add(1)
        go sendUser(user, ch, &wg)
    }

    // close the channel in the background
    go func() {
        wg.Wait()
        close(ch)
    }()
    // read from channel as they come in until its closed
    for res := range ch {
        responses = append(responses, res)
    }

    return responses, nil
}

它允许在发送通道时从通道中读取。通过使用等待组,我将知道何时关闭频道。通过将等待组和关闭放在一个 goroutine 中,我可以“实时”从通道中读取而不会阻塞。

【讨论】:

  • 不错。关于运行并发 io 的再评论。请注意单个请求涉及多少内存。运行数千个 goroutine 是多么容易,很容易让人忘乎所以,但如果每个请求对象最终占用 1MB 内存,那么你就在数千个 GB 范围内。您可以使用缓冲通道来限制速率。
  • @rayfenwindspear 我完全同意我认为在生产中我会使用信号量来运行它以限制请求甚至内存检查。
  • @retcentroot @RayfenWindspear 谢谢大家。缓冲通道如何帮助限制速率?如果我理解正确,那么 _, user = range users { wg.Add(1) go sendUser(user, ch, &wg) } 的这个循环正在旋转 goroutines 的数量 = 用户范围。缓冲如何对此有所帮助案例?我怎样才能限制只有正常数量的工人启动并将其返回到渠道,然后接管其余的工作?
  • 缓冲通道就像一个带有队列的存储桶,如果该存储桶已满,它将阻塞。因此,例如,如果限制为 10,则只有 10 个 goroutine 会启动,则从桶中消耗一个项目,另一个通道将被发送。先进先出。假设您有 100 个网址。最多只能同时处理 10 个 VS 旋转 100 个 goroutines
【解决方案2】:

对于有限并行/速率限制,我们可以在https://blog.golang.org/pipelines#TOC_9.看一个例子

基本上步骤是:

  1. 将用于调用 API 的输入/参数/参数流式传输到输入通道。
  2. 运行N worker goroutines,每个使用相同的(共享的)输入通道。从输入通道获取参数,调用 API,将结果发送到结果通道。
  3. 使用结果通道,如果有错误早点返回。

sync.WaitGroup 用于等待所有工作 goroutine 完成(在输入通道耗尽后)。

下面是它的代码示例(您可以立即运行它,尝试将NUM_PARALLEL 更改为不同的并行数)。将BASE_URL 更改为您的基本网址。

package main

import (
    "fmt"
    "io"
    "net/http"
    "strconv"
    "sync"
    "time"
)

// placeholder url. Change it to your base url.
const BASE_URL = "https://jsonplaceholder.typicode.com/posts/"

// number of parallelism
const NUM_PARALLEL = 20

// Stream inputs to input channel
func streamInputs(done <-chan struct{}, inputs []string) <-chan string {
    inputCh := make(chan string)
    go func() {
        defer close(inputCh)
        for _, input := range inputs {
            select {
            case inputCh <- input:
            case <-done:
                // in case done is closed prematurely (because error midway),
                // finish the loop (closing input channel)
                break
            }
        }
    }()
    return inputCh
}

// Normal function for HTTP call, no knowledge of goroutine/channels
func sendUser(user string) (string, error) {
    url := BASE_URL + user
    resp, err := http.Get(url)
    if err != nil {
        return "", err
    }
    defer resp.Body.Close()

    body, err := io.ReadAll(resp.Body)
    if err != nil {
        return "", err
    }

    bodyStr := string(body)
    return bodyStr, nil
}

// Wrapper for sendUser return value, used as result channel type
type result struct {
    bodyStr string
    err     error
}

func AsyncHTTP(users []string) ([]string, error) {
    done := make(chan struct{})
    defer close(done)

    inputCh := streamInputs(done, users)

    var wg sync.WaitGroup
    // bulk add goroutine counter at the start
    wg.Add(NUM_PARALLEL)

    resultCh := make(chan result)

    for i := 0; i < NUM_PARALLEL; i++ {
        // spawn N worker goroutines, each is consuming a shared input channel.
        go func() {
            for input := range inputCh {
                bodyStr, err := sendUser(input)
                resultCh <- result{bodyStr, err}
            }
            wg.Done()
        }()
    }

    // Wait all worker goroutines to finish. Happens if there's no error (no early return)
    go func() {
        wg.Wait()
        close(resultCh)
    }()

    results := []string{}
    for result := range resultCh {
        if result.err != nil {
            // return early. done channel is closed, thus input channel is also closed.
            // all worker goroutines stop working (because input channel is closed)
            return nil, result.err
        }
        results = append(results, result.bodyStr)
    }

    return results, nil
}

func main() {
    // populate users param
    users := []string{}
    for i := 1; i <= 100; i++ {
        users = append(users, strconv.Itoa(i))
    }

    start := time.Now()

    results, err := AsyncHTTP(users)
    if err != nil {
        fmt.Println(err)
        return
    }

    for _, result := range results {
        fmt.Println(result)
    }

    fmt.Println("finished in ", time.Since(start))
}

【讨论】:

    猜你喜欢
    • 2012-01-04
    • 2017-01-28
    • 2021-12-03
    • 1970-01-01
    • 1970-01-01
    • 2021-03-08
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多