【问题标题】:Concurrently write multiple csv files from one, splitting on a partition column in Golang从一个并发写入多个csv文件,在Golang中的一个分区列上拆分
【发布时间】:2018-04-27 23:03:49
【问题描述】:

我的目标是读取一个或多个共享通用格式的 csv 文件,并根据 csv 数据中的分区列写入单独的文件。请允许最后一列是分区,数据未排序,并且可以在多个文件中找到给定的分区。一个文件的例子:

fsdio,abc,def,2017,11,06,01
1sdf9,abc,def,2017,11,06,04
22df9,abc,def,2017,11,06,03
1d243,abc,def,2017,11,06,02

如果这种方法闻起来像可怕的 XY 问题,我很乐意调整。

到目前为止我已经尝试过:

  • 读入数据集并遍历每一行
  • 如果分区有 被看到,分拆一个新的工作例程(这将包含一个文件/csv 作家)。将线路发送到chan []string
  • 由于每个工作器都是文件写入器,因此它应该只接收其输入通道上恰好一个分区的行。

这显然行不通(目前),因为我不知道如何根据给定行上看到的分区值将行发送到正确的工作人员。

我已经为每个工作人员分配了每个分区值的 id string,但我不知道如何选择要发送到的工作人员,如果我应该为每个工作人员创建单独的 chan []string 并发送到该通道select,或者如果一个结构体应该为每个工作人员提供某种池和路由功能。

TLDR;我不知道如何根据某些分类 string 值有条件地将数据发送到给定的 goroutine 或通道,其中唯一的数量可以是任意的,但可能不超过 24 个唯一分区值。

我会警告说,我注意到这样的问题确实会被否决,所以如果你觉得这是反建设性的或不完整的足以被否决的,请评论为什么这样我可以避免重复冒犯。

提前感谢您的帮助!

Playground

片段:

  package main

    import (
        "encoding/csv"
        "fmt"
        "log"
        "strings"
        "time"
    )

    func main() {

        // CSV
        r := csv.NewReader(csvFile1)
        lines, err := r.ReadAll()
        if err != nil {
            log.Fatalf("error reading all lines: %v", err)
        }

        // CHANNELS
        lineChan := make(chan []string)

        // TRACKER
        var seenPartitions []string

        for _, line := range lines {

            hour := line[6]
            if !stringInSlice(hour, seenPartitions) {
                seenPartitions = append(seenPartitions, hour)
                go worker(hour, lineChan)
            }
            // How to send to the correct worker/channel? 
            lineChan <- line

        }
        close(lineChan)
    }

    func worker(id string, lineChan <-chan []string) {
        for j := range lineChan {
            fmt.Println("worker", id, "started  job", j)
            // Write to a new file here and wait for input over the channel
            time.Sleep(time.Second)
            fmt.Println("worker", id, "finished job", j)
        }
    }

    func stringInSlice(str string, list []string) bool {
        for _, v := range list {
            if v == str {
                return true
            }
        }
        return false
    }

    // DUMMY
var csvFile1 = strings.NewReader(`
12fy3,abc,def,2017,11,06,04 
fsdio,abc,def,2017,11,06,01
11213,abc,def,2017,11,06,02
1sdf9,abc,def,2017,11,06,01
2123r,abc,def,2017,11,06,03
1v2t3,abc,def,2017,11,06,01
1r2r3,abc,def,2017,11,06,02
g1253,abc,def,2017,11,06,02
d1e23,abc,def,2017,11,06,02
a1d23,abc,def,2017,11,06,02
12jj3,abc,def,2017,11,06,03
t1r23,abc,def,2017,11,06,03
22123,abc,def,2017,11,06,03
14d23,abc,def,2017,11,06,04
1d243,abc,def,2017,11,06,01
1da23,abc,def,2017,11,06,04
a1523,abc,def,2017,11,06,01
12453,abc,def,2017,11,06,04`)

【问题讨论】:

  • 创建一个map[string](chan []string)。然后给定一个分区键,您可以将该行发送到相应的通道。
  • 同意@zerkms。或者为了在保持简单的同时获得更大的灵活性,让每个工作人员成为一个包含其 ID 的 struct 类型的实例、一个用于发送行的通道、一个退出通道来告诉它何时停止/刷新/关闭,以及它需要的任何其他内容,然后按住 map[string]worker 并使用它将正确的线路发送给正确的工作人员。
  • @gpanda 频道不太适合充当未绑定的完全有序队列。相反,您可以创建一个持有锁(互斥体)并在内部实现为切片的结构。然后在每个push 上,您将一个元素附加到切片上,在每个pop 上,您从头部取出一个元素。
  • 观点很好,@zerkms。不过,通过这种方法,我已经很接近了!在所有例程完成后陷入僵局。 play.golang.org/p/8T1swVHlgT 将在我尝试了您的多路复用器解决方案后对此进行处理并报告。
  • @gpanda 我认为这不是线程安全的:workerPool 访问也必须同步,因为您同时对其进行读写。除此之外,当你让它并发安全时,你会发现你的实现中没有任何东西保证顺序:所以行可能被安排以任意顺序处理。

标签: csv go concurrency channel


【解决方案1】:

同步版本先不要去并发魔术(请参阅下面的并发版本)。

package main

import (
    "encoding/csv"
    "fmt"
    "io"
    "log"
    "strings"
)

func main() {

    // CSV
    r := csv.NewReader(csvFile1)
    partitions := make(map[string][][]string)

    for {
        rec, err := r.Read()
        if err != nil {
            if err == io.EOF {
                err = nil

                save_partitions(partitions)

                return
            }
            log.Fatal(err)
        }

        process(rec, partitions)
    }

}

// prints only
func save_partitions(partitions map[string][][]string) {
    for part, recs := range partitions {
        fmt.Println(part)
        for _, rec := range recs {
            fmt.Println(rec)
        }
    }
}

// this can also write/append directly to a file
func process(rec []string, partitions map[string][][]string) {
    l := len(rec)
    part := rec[l-1]
    if p, ok := partitions[part]; ok {
        partitions[part] = append(p, rec)
    } else {
        partitions[part] = [][]string{rec}
    }
}

// DUMMY
var csvFile1 = strings.NewReader(`
fsdio,abc,def,2017,11,06,01
1sdf9,abc,def,2017,11,06,01
1d243,abc,def,2017,11,06,01
1v2t3,abc,def,2017,11,06,01
a1523,abc,def,2017,11,06,01
1r2r3,abc,def,2017,11,06,02
11213,abc,def,2017,11,06,02
g1253,abc,def,2017,11,06,02
d1e23,abc,def,2017,11,06,02
a1d23,abc,def,2017,11,06,02
12jj3,abc,def,2017,11,06,03
t1r23,abc,def,2017,11,06,03
2123r,abc,def,2017,11,06,03
22123,abc,def,2017,11,06,03
14d23,abc,def,2017,11,06,04
1da23,abc,def,2017,11,06,04
12fy3,abc,def,2017,11,06,04
12453,abc,def,2017,11,06,04`)

https://play.golang.org/p/--iqZGzxCF

以及并发版本:

package main

import (
    "encoding/csv"
    "fmt"
    "io"
    "log"
    "strings"
    "sync"
)

var (
    // list of channels to communicate with workers
    // workers accessed synchronousely no mutex required
    workers = make(map[string]chan []string)

    // wg is to make sure all workers done before exiting main
    wg = sync.WaitGroup{}

    // mu used only for sequential printing, not relevant for program logic
    mu = sync.Mutex{}
)

func main() {

    // wait for all workers to finish up before exit
    defer wg.Wait()

    r := csv.NewReader(csvFile1)

    for {
        rec, err := r.Read()
        if err != nil {
            if err == io.EOF {
                savePartitions()
                return
            }
            log.Fatal(err) // sorry for the panic
        }
        process(rec)
    }

}

func process(rec []string) {
    l := len(rec)
    part := rec[l-1]

    if c, ok := workers[part]; ok {
        // send rec to worker
        c <- rec
    } else {
        // if no worker for the partition

        // make a chan
        nc := make(chan []string)
        workers[part] = nc

        // start worker with this chan
        go worker(nc)

        // send rec to worker via chan
        nc <- rec
    }
}

func worker(c chan []string) {

    // wg.Done signals to main worker completion
    wg.Add(1)
    defer wg.Done()

    part := [][]string{}
    for {
        // wait for a rec or close(chan)
        rec, ok := <-c
        if ok {
            // save the rec
            // instead of accumulation in memory
            // this can be saved to file directly
            part = append(part, rec)
        } else {
            // channel closed on EOF

            // dump partition
            // locks ensures sequential printing
            // not a required for independent files
            mu.Lock()
            for _, p := range part {
                fmt.Printf("%+v\n", p)
            }
            mu.Unlock()

            return
        }
    }
}

// simply signals to workers to stop
func savePartitions() {
    for _, c := range workers {
        // signal to all workers to exit
        close(c)
    }
}

// DUMMY
var csvFile1 = strings.NewReader(`
fsdio,abc,def,2017,11,06,01
1sdf9,abc,def,2017,11,06,01
1d243,abc,def,2017,11,06,01
1v2t3,abc,def,2017,11,06,01
a1523,abc,def,2017,11,06,01
1r2r3,abc,def,2017,11,06,02
11213,abc,def,2017,11,06,02
g1253,abc,def,2017,11,06,02
d1e23,abc,def,2017,11,06,02
a1d23,abc,def,2017,11,06,02
12jj3,abc,def,2017,11,06,03
t1r23,abc,def,2017,11,06,03
2123r,abc,def,2017,11,06,03
22123,abc,def,2017,11,06,03
14d23,abc,def,2017,11,06,04
1da23,abc,def,2017,11,06,04
12fy3,abc,def,2017,11,06,04
12453,abc,def,2017,11,06,04`)

https://play.golang.org/p/oBTPosy0yT

玩得开心!

【讨论】:

  • 我绝对会看看这对我的用例是如何工作的,并在有任何问题时报告。谢谢!
  • 两种解决方案都很棒,但第二种特别优雅。我特别欣赏用于在 worker() 中关闭 chan 的 , ok 语法以及解释性 cmets。我很好奇关于独立文件不需要 Lock() 的评论。我认为这是我将适应为每个分区写入单独的 csv 的地方 - 你确定这里不需要锁来将每个分区写入文件吗?
  • @gpanda 是的,我很确定您不需要为此锁定。之所以需要它,是因为在打印时我们有一个共享资源标准输出到我们打印的位置。尝试注释掉mu.Lock and mu.Unlock。除了行的顺序是随机的,不会发生任何不好的事情。有了这个锁,每个工人都在说“大家说话的时候闭嘴”。当您每个工作人员拥有一个文件时,您不会共享任何内容,因此您可以将part := [][]string{} 视为您的文件。而且我们不会锁定它以附加到它。
猜你喜欢
  • 2016-07-26
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-06-11
  • 2021-06-08
  • 2019-04-03
  • 1970-01-01
相关资源
最近更新 更多