【发布时间】:2020-01-15 06:11:39
【问题描述】:
我有一个用 Go 编写的光束批处理管道,它需要一个包含 2000 万行(大约 600mb 数据)的 .csv 文件,执行 SumPerKey 等基本转换步骤并将输出写回 GCS。
在 Dataflow 上运行管道时,它只调用一个包含 1 个运行器的池!
我希望 Dataflow 能够在多个工作人员之间并行处理这么多数据的工作。我错过了什么吗?
这是我的代码:
func main() {
flag.Parse()
beam.Init()
p, s := beam.NewPipelineWithRoot()
ctx := context.Background()
log.Infof(ctx, "Started pipeline on scope: %s", s)
/* [TEST PIPELINE START ]*/
sr := csvio.Read(s, *input, reflect.TypeOf(Rating{}))
pwo := beam.ParDo(s.Scope("Pair Key With One"),
func(x Rating, emit func(int, int)) {
emit(x.UserId, 1)
}, sr)
spk := stats.SumPerKey(s, pwo)
mp := beam.ParDo(s.Scope("Map KV To Struct"),
func(k int, v int, emit func(UserRatings)) {
emit(UserRatings{
UserId: k,
Ratings: v,
})
}, spk)
t := top.Largest(s, mp, 1000, func(x, y UserRatings) bool { return x.Ratings < y.Ratings })
o := beam.ParDo(s, func(x []UserRatings) string {
if data, err := json.MarshalIndent(x, "", ""); err != nil {
return fmt.Sprintf("[Err]: %v", err)
} else {
return fmt.Sprintf("Output: %s", data)
}
}, t)
textio.Write(s, *output, o)
/* [TEST PIPELINE END ]*/
if err := beamx.Run(ctx, p); err != nil {
fmt.Println(err)
log.Exitf(ctx, "Failed to execute job: on ctx=%v:")
}
}
我通过这个命令行部署管道:
go run main.go \
--runner dataflow \
--max_num_workers 10 \
--file gs://${BUCKET?}/ratings.csv \
--output gs://${BUCKET?}/reporting.txt \
--project ${PROJECT?} \
--temp_location gs://${BUCKET?}/tmp/ \
--staging_location gs://${BUCKET?}/binaries/ \
--worker_harness_container_image=gcr.io/drawndom-app/beam/go:latest
注意:当我将--num_workers 设置为 5 时,它会调用 5 个工作人员,但我希望它自动执行此操作。
【问题讨论】:
标签: go google-cloud-dataflow apache-beam