【问题标题】:Apache Beam Go SDK - Dataflow doesn't auto-scale properly (parallelize steps)Apache Beam Go SDK - 数据流无法正确自动扩展(并行步骤)
【发布时间】:2020-01-15 06:11:39
【问题描述】:

我有一个用 Go 编写的光束批处理管道,它需要一个包含 2000 万行(大约 600mb 数据)的 .csv 文件,执行 SumPerKey 等基本转换步骤并将输出写回 GCS。

在 Dataflow 上运行管道时,它只调用一个包含 1 个运行器的池!

我希望 Dataflow 能够在多个工作人员之间并行处理这么多数据的工作。我错过了什么吗?

这是我的代码:

func main() {
    flag.Parse()

    beam.Init()

    p, s := beam.NewPipelineWithRoot()

    ctx := context.Background()

    log.Infof(ctx, "Started pipeline on scope: %s", s)

    /* [TEST PIPELINE START ]*/

    sr := csvio.Read(s, *input, reflect.TypeOf(Rating{}))

    pwo := beam.ParDo(s.Scope("Pair Key With One"),
        func(x Rating, emit func(int, int)) {
            emit(x.UserId, 1)
        }, sr)

    spk := stats.SumPerKey(s, pwo)

    mp := beam.ParDo(s.Scope("Map KV To Struct"),
        func(k int, v int, emit func(UserRatings)) {
            emit(UserRatings{
                UserId:  k,
                Ratings: v,
            })
        }, spk)

    t := top.Largest(s, mp, 1000, func(x, y UserRatings) bool { return x.Ratings < y.Ratings })

    o := beam.ParDo(s, func(x []UserRatings) string {
        if data, err := json.MarshalIndent(x, "", ""); err != nil {
            return fmt.Sprintf("[Err]: %v", err)
        } else {
            return fmt.Sprintf("Output: %s", data)
        }
    }, t)

    textio.Write(s, *output, o)

    /* [TEST PIPELINE END ]*/

    if err := beamx.Run(ctx, p); err != nil {
        fmt.Println(err)
        log.Exitf(ctx, "Failed to execute job: on ctx=%v:")
    }
}

Full Code Here

我通过这个命令行部署管道:

go run main.go \
  --runner dataflow \
  --max_num_workers 10 \
  --file gs://${BUCKET?}/ratings.csv \
  --output gs://${BUCKET?}/reporting.txt \
  --project ${PROJECT?} \
  --temp_location gs://${BUCKET?}/tmp/ \
  --staging_location gs://${BUCKET?}/binaries/ \
  --worker_harness_container_image=gcr.io/drawndom-app/beam/go:latest

注意:当我将--num_workers 设置为 5 时,它会调用 5 个工作人员,但我希望它自动执行此操作。

【问题讨论】:

    标签: go google-cloud-dataflow apache-beam


    【解决方案1】:

    更新:

    感谢lib,我在 .csv 输入之前添加了一个 Reshuffle 步骤,并且 Dataflow 能够通过再添加 1 个工作人员来进行自动缩放。

    我仍然需要了解如何优化我的管道的并行性。

    使用的代码:

    func Reshuffle(s beam.Scope, col beam.PCollection) beam.PCollection {
        s = s.Scope("Reshuffle")
    
        col = beam.ParDo(s, func(x beam.X) (int, beam.X) {
            return rand.Int(), x
        }, col)
        col = beam.GroupByKey(s, col)
        return beam.ParDo(s, func(key int, values func(*beam.X) bool, emit func(beam.X)) {
            var x beam.X
            for values(&x) {
                emit(x)
            }
        }, col)
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-08-29
      • 1970-01-01
      • 2019-08-29
      • 2016-02-18
      • 1970-01-01
      相关资源
      最近更新 更多