【问题标题】:golang producer consumer number of messages receivedgolang生产者消费者接收到的消息数
【发布时间】:2022-01-15 00:17:13
【问题描述】:

我在 golang 中编写了生产者-消费者模式。读取多个 csv 文件并处理记录。我正在一口气读取所有csv文件的记录。

我想以包括所有 csv 文件在内的总记录的 5% 的间隔记录处理完成的百分比。例如,我有 3 个 csv 来处理,每个有 20,30,50 行/记录(因此总共要处理 100 条记录)想要在处理 5 条记录时记录进度。

func processData(inputCSVFiles []string) {
    producerCount := len(inputCSVFiles)
    consumerCount := producerCount

    link := make(chan []string, 100)
    wp := &sync.WaitGroup{}
    wc := &sync.WaitGroup{}

    wp.Add(producerCount)
    wc.Add(consumerCount)

    for i := 0; i < producerCount; i++ {
        go produce(link, inputCSVFiles[i], wp)
    }

    for i := 0; i < consumerCount; i++ {
        go consume(link, wc)
    }
    wp.Wait()
    close(link)
    wc.Wait()
    fmt.Println("Completed data migration process for all CSV data files.")
}

func produce(link chan<- []string, filePath string, wg *sync.WaitGroup) {
    defer wg.Done()
    records := readCsvFile(filePath)
    totalNumberOfRecords := len(records)
    for _, record := range records {
        link <- record
    }
}

func consume(link <-chan []string, wg *sync.WaitGroup) {
    defer wg.Done()
    for record := range link {
        // process csv record
    }
}

【问题讨论】:

  • I want to log percentage of processing completion in interval of 5% of total records including all csv files. 您需要知道总记录集才能计算它。或者接受这一点,直到建立价值,结果会很奇怪。
  • 只是尝试一些东西。但如果代码是可重现的,那将是一个更好的问题。 go.dev/play/p/YcoZl16UuR3

标签: go channel consumer producer


【解决方案1】:

我使用了原子变量和计数器通道,其中消费者将在处理记录时推送计数,而其他 goroutine 将从通道中读取并计算总处理记录百分比。

var progressPercentageStep float64 = 5.0
var totalRecordsToProcess int32

func processData(inputCSVFiles []string) {
        producerCount := len(inputCSVFiles)
        consumerCount := producerCount
        link := make(chan []string, 100)
        counter := make(chan int, 100)
        defer close(counter)
        wp := &sync.WaitGroup{}
        wc := &sync.WaitGroup{}
    
        wp.Add(producerCount)
        wc.Add(consumerCount)
    
        for i := 0; i < producerCount; i++ {
            go produce(link, inputCSVFiles[i], wp)
        }

        go progressStats(counter)

        for i := 0; i < consumerCount; i++ {
            go consume(link, wc)
        }
        wp.Wait()
        close(link)
        wc.Wait()
        
    }
    
    func produce(link chan<- []string, filePath string, wg *sync.WaitGroup) {
        defer wg.Done()
        records := readCsvFile(filePath)
        atomic.AddInt32(&totalRecordsToProcess, int32(len(records)))
        for _, record := range records {
            link <- record
        }
    }
    
    func consume(link <-chan []string,counter chan<- int, wg *sync.WaitGroup) {
        defer wg.Done()
        for record := range link {
            // process csv record
            counter <- 1
        }
    }
    
func progressStats(counter <-chan int) {
    var feedbackThreshold = progressPercentageStep
    for count := range counter {
        totalRemaining := atomic.AddInt32(&totalRecordsToProcess, -count)
        donePercent := 100.0 * processed / totalRemaining
        // log progress
        if donePercent >= feedbackThreshold {
            log.Printf("Progress ************** Total Records: %d, Processed Records : %d, Processed Percentage: %.2f **************\n", totalRecordsToProcess, processed, donePercent)
            feedbackThreshold += progressPercentageStep
        }
    }
}

【讨论】:

  • 在没有原子的情况下阅读 totalRecordsToProcess 很有趣。
  • 当心linkchan&lt;- []string,但你发送的是record,一个字符串。
  • 我为你修好了比赛。其余的,您的编译器将帮助您解决其他问题。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2022-09-25
  • 1970-01-01
  • 2018-02-27
相关资源
最近更新 更多