【问题标题】:How to read log messages for CombineFn function in GCP Dataflow?如何在 GCP Dataflow 中读取 CombineFn 函数的日志消息?
【发布时间】:2019-10-07 11:00:29
【问题描述】:

我正在创建一个 Apache Beam 流处理管道以在 GCP Dataflow 中运行。我有许多扩展 DoFn 和 CombineFn 的变换。在 DoFn 中,使用 Dataflow 作业详细信息中的 LOGS 窗口可以很好地可视化日志。但是,没有显示来自 CombineFn 转换的日志。

我尝试了不同的日志级别,使用 DirectRunner 也显示良好。

这里是一些示例代码。为简洁起见,我将输入和输出更改为 String,我的代码中有一些自定义类。

import java.io.Serializable;
import org.apache.avro.reflect.Nullable;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AverageSpv extends CombineFn<String, AverageSpv.Accum, String> {
    private static final Logger LOG = LoggerFactory.getLogger(AverageSpv.class);

    @DefaultCoder(AvroCoder.class)
    public static class Accum implements Serializable {
        @Nullable String id;
    }

    @Override
    public Accum createAccumulator() {
        return new Accum();
    }

    @Override
    public Accum addInput(Accum accumulator, String input) {
        LOG.info("Add input: id {}, input);

        accumulator.id = input;

        return accumulator;
    }

    @Override
    public Accum mergeAccumulators(Iterable<Accum> accumulators) {
        LOG.info("Merging accumulator");

        Accum merged = createAccumulator();
        for (Accum accumulator : accumulators) {
            merged.id = accumulator.id;            
        }

        return merged;
    }

    @Override
    public VehicleSpeedPerSegmentInfo extractOutput(Accum accumulator) {
        LOG.info("Extracting accumulator");

        LOG.info("Extract output: id {}", acummulator.id);

        return acummulator.id;
    }
}

【问题讨论】:

  • 我进行了复制,而 Cloud Console 也发生在我身上。但是,我可以在 Stackdriver Logging 中看到它们。添加some notes 以防万一。
  • 好的,当我单击 Dataflow 控制台中的 CombineFn 步骤,然后单击 LOGS 时,那里并没有像您发现的那样。当我单击 StackDriver 链接时,日志仍未显示在 StackDriver 中。但是,当我取消选择 CombineFn 步骤并转到常规 LOGS 和 StackDriver 时,CombineFn 日志就在那里。感谢您的解决方法。

标签: google-cloud-dataflow apache-beam


【解决方案1】:

Apache Beam CombineFn 操作在 Dataflow 中跨多个步骤执行。 (具体来说,在将所有结果改组到单个键之前会进行尽可能多的预组合,然后在后续的后 GBK 步骤中将所有上游结果合并到最终结果中。)没有单个执行“步骤”的事实对应于图中的原始组合步骤可能是阻止找到日志的原因。

这是一个错误,应该修复。如前所述,一种解决方法是查看管道中的所有日志。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-07-27
    • 2020-10-16
    • 1970-01-01
    • 2021-04-14
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多