【发布时间】:2018-04-27 23:03:49
【问题描述】:
我的目标是读取一个或多个共享通用格式的 csv 文件,并根据 csv 数据中的分区列写入单独的文件。请允许最后一列是分区,数据未排序,并且可以在多个文件中找到给定的分区。一个文件的例子:
fsdio,abc,def,2017,11,06,01
1sdf9,abc,def,2017,11,06,04
22df9,abc,def,2017,11,06,03
1d243,abc,def,2017,11,06,02
如果这种方法闻起来像可怕的 XY 问题,我很乐意调整。
到目前为止我已经尝试过:
- 读入数据集并遍历每一行
- 如果分区有
被看到,分拆一个新的工作例程(这将包含一个文件/csv
作家)。将线路发送到
chan []string。 - 由于每个工作器都是文件写入器,因此它应该只接收其输入通道上恰好一个分区的行。
这显然行不通(目前),因为我不知道如何根据给定行上看到的分区值将行发送到正确的工作人员。
我已经为每个工作人员分配了每个分区值的 id string,但我不知道如何选择要发送到的工作人员,如果我应该为每个工作人员创建单独的 chan []string 并发送到该通道select,或者如果一个结构体应该为每个工作人员提供某种池和路由功能。
TLDR;我不知道如何根据某些分类 string 值有条件地将数据发送到给定的 goroutine 或通道,其中唯一的数量可以是任意的,但可能不超过 24 个唯一分区值。
我会警告说,我注意到这样的问题确实会被否决,所以如果你觉得这是反建设性的或不完整的足以被否决的,请评论为什么这样我可以避免重复冒犯。
提前感谢您的帮助!
片段:
package main
import (
"encoding/csv"
"fmt"
"log"
"strings"
"time"
)
func main() {
// CSV
r := csv.NewReader(csvFile1)
lines, err := r.ReadAll()
if err != nil {
log.Fatalf("error reading all lines: %v", err)
}
// CHANNELS
lineChan := make(chan []string)
// TRACKER
var seenPartitions []string
for _, line := range lines {
hour := line[6]
if !stringInSlice(hour, seenPartitions) {
seenPartitions = append(seenPartitions, hour)
go worker(hour, lineChan)
}
// How to send to the correct worker/channel?
lineChan <- line
}
close(lineChan)
}
func worker(id string, lineChan <-chan []string) {
for j := range lineChan {
fmt.Println("worker", id, "started job", j)
// Write to a new file here and wait for input over the channel
time.Sleep(time.Second)
fmt.Println("worker", id, "finished job", j)
}
}
func stringInSlice(str string, list []string) bool {
for _, v := range list {
if v == str {
return true
}
}
return false
}
// DUMMY
var csvFile1 = strings.NewReader(`
12fy3,abc,def,2017,11,06,04
fsdio,abc,def,2017,11,06,01
11213,abc,def,2017,11,06,02
1sdf9,abc,def,2017,11,06,01
2123r,abc,def,2017,11,06,03
1v2t3,abc,def,2017,11,06,01
1r2r3,abc,def,2017,11,06,02
g1253,abc,def,2017,11,06,02
d1e23,abc,def,2017,11,06,02
a1d23,abc,def,2017,11,06,02
12jj3,abc,def,2017,11,06,03
t1r23,abc,def,2017,11,06,03
22123,abc,def,2017,11,06,03
14d23,abc,def,2017,11,06,04
1d243,abc,def,2017,11,06,01
1da23,abc,def,2017,11,06,04
a1523,abc,def,2017,11,06,01
12453,abc,def,2017,11,06,04`)
【问题讨论】:
-
创建一个
map[string](chan []string)。然后给定一个分区键,您可以将该行发送到相应的通道。 -
同意@zerkms。或者为了在保持简单的同时获得更大的灵活性,让每个工作人员成为一个包含其 ID 的
struct类型的实例、一个用于发送行的通道、一个退出通道来告诉它何时停止/刷新/关闭,以及它需要的任何其他内容,然后按住map[string]worker并使用它将正确的线路发送给正确的工作人员。 -
@gpanda 频道不太适合充当未绑定的完全有序队列。相反,您可以创建一个持有锁(互斥体)并在内部实现为切片的结构。然后在每个
push上,您将一个元素附加到切片上,在每个pop上,您从头部取出一个元素。 -
观点很好,@zerkms。不过,通过这种方法,我已经很接近了!在所有例程完成后陷入僵局。 play.golang.org/p/8T1swVHlgT 将在我尝试了您的多路复用器解决方案后对此进行处理并报告。
-
@gpanda 我认为这不是线程安全的:
workerPool访问也必须同步,因为您同时对其进行读写。除此之外,当你让它并发安全时,你会发现你的实现中没有任何东西保证顺序:所以行可能被安排以任意顺序处理。
标签: csv go concurrency channel