【问题标题】:GroupByKey always holds holds everything in RAM, causing OOMGroupByKey 始终将所有内容保存在 RAM 中,导致 OOM
【发布时间】:2021-11-17 02:28:28
【问题描述】:

我正在编写一个流水线代码,该代码将在 DataFlow 的批处理模式和流模式下使用,并且在批处理模式下使用 GroupByKey 时遇到 OOM 问题。下面的代码显示了这个问题:当我有一个大文件时,GroupByKey 似乎将所有内容都保存在内存中,仅在输入完成后才发出值。我尝试使用触发器来强制触发事件,但失败了。我找不到在大文件上使用此转换的任何方法。

如何在 beam go 中实现一个包含分组并且可以有效处理大文件的管道?

package sisubqio_test

import (
    "context"
    "flag"
    "fmt"
    "io"
    "os"
    "strings"
    "sync/atomic"
    "testing"
    "time"

    "github.com/apache/beam/sdks/v2/go/pkg/beam"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)

func TestWriter(t *testing.T) {
    mustNotFail := func(err error) {
        if err != nil {
            t.Fatal(err)
        }
    }

    // test file with a few lines of text
    fName := "in.tmp.txt"
    f, err := os.OpenFile(fName, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644)
    mustNotFail(err)
    defer func() {
        mustNotFail(f.Close())
        mustNotFail(os.Remove(fName))
    }()
    for i := 0; i < 10; i++ {
        _, err = fmt.Fprintf(f, "line %d\n", i)
        mustNotFail(err)
    }

    _, err = f.Seek(0, io.SeekStart)
    mustNotFail(err)

    flag.Parse()
    beam.Init()

    pipeline, s := beam.NewPipelineWithRoot()
    col := textio.Read(s, fName)

    // add timestamp to messages: each message has a timestamp 20s after
    // the previous one
    now := time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC)
    var counter int32
    col = beam.ParDo(s, func(line beam.X) (typex.EventTime, beam.X) {
        i := atomic.AddInt32(&counter, 1) - 1
        evTime := mtime.Time(mtime.FromTime(now.Add(20 * time.Duration(i) * time.Second)).Milliseconds())
        t.Logf("[0] input event, time=%v", evTime)
        return evTime, line
    }, col)

    // add a window and inspect events, when emitted
    col = beam.WindowInto(s,
        window.NewFixedWindows(time.Minute),
        col,
        beam.Trigger(window.TriggerAlways()), // I tried all triggers here; makes no difference
    )
    col = beam.ParDo(s, func(w typex.Window, e string) string {
        t.Logf("[1] window: %v", w)
        return e
    }, col)

    // add a key and group by it; inspect events, when emitted
    col = beam.AddFixedKey(s, col)
    col = beam.ParDo(s, func(et typex.EventTime, group int, x beam.X) (int, beam.X) {
        t.Logf("[2] at %v got (group %d)",
            time.UnixMilli(int64(et)),
            group)
        return group, x
    }, col)

    // ISSUE IS HERE
    // It doesn't matter the trigger I use, it looks like GroupByKey
    // always wants to hold everything into memory and only then
    // emit it's outputs. With large files is always OOMs.
    col = beam.GroupByKey(s, col)
    beam.ParDo0(s, func(w typex.Window, group int, valIter func(*string) bool) {
        sb := strings.Builder{}
        fmt.Fprintf(&sb, "[3] win=%v out group=%d", w, group)
        var elm string
        for valIter(&elm) {
            fmt.Fprintf(&sb, " %s;", elm)
        }
        t.Log(sb.String())
    }, col)

    mustNotFail(beamx.Run(context.Background(), pipeline))
}

输出:

    writer_test.go:58: [0] input event, time=1577836800000
    writer_test.go:69: [1] window: [1577836800000:1577836860000)
    writer_test.go:79: [2] at 2020-01-01 01:00:00 +0100 CET got (group 0)
    writer_test.go:58: [0] input event, time=1577836820000
    writer_test.go:69: [1] window: [1577836800000:1577836860000)
    writer_test.go:79: [2] at 2020-01-01 01:00:20 +0100 CET got (group 0)
    writer_test.go:58: [0] input event, time=1577836840000
    writer_test.go:69: [1] window: [1577836800000:1577836860000)
    writer_test.go:79: [2] at 2020-01-01 01:00:40 +0100 CET got (group 0)
    writer_test.go:58: [0] input event, time=1577836860000
    writer_test.go:69: [1] window: [1577836860000:1577836920000)
    writer_test.go:79: [2] at 2020-01-01 01:01:00 +0100 CET got (group 0)
    writer_test.go:58: [0] input event, time=1577836880000
    writer_test.go:69: [1] window: [1577836860000:1577836920000)
    writer_test.go:79: [2] at 2020-01-01 01:01:20 +0100 CET got (group 0)
    writer_test.go:58: [0] input event, time=1577836900000
    writer_test.go:69: [1] window: [1577836860000:1577836920000)
    writer_test.go:79: [2] at 2020-01-01 01:01:40 +0100 CET got (group 0)
    writer_test.go:58: [0] input event, time=1577836920000
    writer_test.go:69: [1] window: [1577836920000:1577836980000)
    writer_test.go:79: [2] at 2020-01-01 01:02:00 +0100 CET got (group 0)
    writer_test.go:58: [0] input event, time=1577836940000
    writer_test.go:69: [1] window: [1577836920000:1577836980000)
    writer_test.go:79: [2] at 2020-01-01 01:02:20 +0100 CET got (group 0)
    writer_test.go:58: [0] input event, time=1577836960000
    writer_test.go:69: [1] window: [1577836920000:1577836980000)
    writer_test.go:79: [2] at 2020-01-01 01:02:40 +0100 CET got (group 0)
    writer_test.go:58: [0] input event, time=1577836980000
    writer_test.go:69: [1] window: [1577836980000:1577837040000)
    writer_test.go:79: [2] at 2020-01-01 01:03:00 +0100 CET got (group 0)
    writer_test.go:95: [3] win=[1577836920000:1577836980000) out group=0 line 6; line 7; line 8;
    writer_test.go:95: [3] win=[1577836980000:1577837040000) out group=0 line 9;
    writer_test.go:95: [3] win=[1577836800000:1577836860000) out group=0 line 0; line 1; line 2;
    writer_test.go:95: [3] win=[1577836860000:1577836920000) out group=0 line 3; line 4; line 5;

编辑:我发现与 triggerswindows 相关的 Jira 票证,在撰写本文时,请相信触发器,特别是触发器传播是 WIP。

【问题讨论】:

    标签: go google-cloud-platform apache-beam google-dataflow


    【解决方案1】:

    Beam 使用 map 和 reduce 操作。 Map(转换)可以在不同的工作人员/VM 上并行完成。 Reduce 需要知道所有要执行的元素,因此它将所有元素加载到内存中,然后执行 reduce groupBy 操作。

    您有 2 个解决方案:

    • 您可以创建窗口来仅处理大文件的块。但是,您的 groupBy 不会是全局的,而是每个窗口的。

    • 您也可以尝试新的Dataflow 主要选项。它是无服务器且完全可扩展的。承诺是消除所有 OOM 错误(我只在 Java 中得到,我从不使用 Beam Go SDK)

    您也可以增加工作人员的内存,但这不是一个可扩展的解决方案(而且成本更高!)。首选是不错的选择(但仍处于预览阶段)

    【讨论】:

    • 感谢您的回复。我发布的示例代码有一个每分钟的窗口。我面临的问题是它看起来仍然在 RAM 中保存所有内容,然后它计算每个窗口的聚合。我相信这个问题是 go sdk 特有的,因为在 Java 中我不会使用 GroupByKey,而是使用更酷的方式 Group&lt;&gt;.byFieldNames().aggregateFields().apply(),这在 go 中不可用。看起来我唯一的选择是在一个阶段从我的数据中计算一个键,然后在另一个阶段通过它进行聚合,最后在从键中迭代行时使用 DoFn 手动计算我的聚合。
    • 等等,我知道它适用于流式处理和批处理。但是当您有一个大文件要处理时(我理解,批处理,因为它是一个批处理),您就会遇到 OOM 问题。在批处理模式下,您需要作弊创建窗口并为您的行添加时间戳,以允许窗口函数管理一堆行。否则,您只有一个全局窗口,即使您在代码中使用窗口,也只会创建一个窗口,并且您在单个节点上处理内存中的所有数据。我的理解正确吗?
    • 你完全搞定了。上面的代码是批处理模式的示例,实际代码更复杂,并且正在编写以在流(来自 pubsub)和批处理(来自 gcs)中工作。示例代码为每条消息分配时间戳,应用 1 分钟的固定窗口,添加一个键并按其分组。我很清楚,为消息添加时间戳、应用窗口和添加密钥都可以正常工作。问题出在GroupByKey 上,我认为应该在每个窗口关闭后发出它的输出,但在这里它在所有 4 个窗口之后发出输出。
    猜你喜欢
    • 2019-09-28
    • 1970-01-01
    • 1970-01-01
    • 2011-07-28
    • 1970-01-01
    • 1970-01-01
    • 2012-04-25
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多