【发布时间】:2019-10-07 11:00:29
【问题描述】:
我正在创建一个 Apache Beam 流处理管道以在 GCP Dataflow 中运行。我有许多扩展 DoFn 和 CombineFn 的变换。在 DoFn 中,使用 Dataflow 作业详细信息中的 LOGS 窗口可以很好地可视化日志。但是,没有显示来自 CombineFn 转换的日志。
我尝试了不同的日志级别,使用 DirectRunner 也显示良好。
这里是一些示例代码。为简洁起见,我将输入和输出更改为 String,我的代码中有一些自定义类。
import java.io.Serializable;
import org.apache.avro.reflect.Nullable;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AverageSpv extends CombineFn<String, AverageSpv.Accum, String> {
private static final Logger LOG = LoggerFactory.getLogger(AverageSpv.class);
@DefaultCoder(AvroCoder.class)
public static class Accum implements Serializable {
@Nullable String id;
}
@Override
public Accum createAccumulator() {
return new Accum();
}
@Override
public Accum addInput(Accum accumulator, String input) {
LOG.info("Add input: id {}, input);
accumulator.id = input;
return accumulator;
}
@Override
public Accum mergeAccumulators(Iterable<Accum> accumulators) {
LOG.info("Merging accumulator");
Accum merged = createAccumulator();
for (Accum accumulator : accumulators) {
merged.id = accumulator.id;
}
return merged;
}
@Override
public VehicleSpeedPerSegmentInfo extractOutput(Accum accumulator) {
LOG.info("Extracting accumulator");
LOG.info("Extract output: id {}", acummulator.id);
return acummulator.id;
}
}
【问题讨论】:
-
我进行了复制,而 Cloud Console 也发生在我身上。但是,我可以在 Stackdriver Logging 中看到它们。添加some notes 以防万一。
-
好的,当我单击 Dataflow 控制台中的 CombineFn 步骤,然后单击 LOGS 时,那里并没有像您发现的那样。当我单击 StackDriver 链接时,日志仍未显示在 StackDriver 中。但是,当我取消选择 CombineFn 步骤并转到常规 LOGS 和 StackDriver 时,CombineFn 日志就在那里。感谢您的解决方法。
标签: google-cloud-dataflow apache-beam