【发布时间】:2015-09-09 01:11:56
【问题描述】:
我们正在使用聚合器来记录处理过程中的异常情况
public class BigTableWriter extends DoFn<String, Void> {
private Aggregator<Integer, Integer> errorAggregator;
public BigTableWriter(CloudBigtableOptions options) {
errorAggregator = createAggregator("errors",new Sum.SumIntegerFn());
}
@Override
public void processElement(DoFn<String, Void>.ProcessContext c){
try {
....do work here
}
catch(Exception ex){
errorAggregator.addValue(1);
}
}
}
我们希望使其更加细化,而不是保留一个聚合器来收集错误。文档说聚合器通常是在构造函数中创建的。是否可以为我们的 catch 块内的每个异常类型创建一个聚合器?例如,我们想做类似的事情。
public class BigTableWriter extends DoFn<String, Void> {
private Map<String, Aggregator<Integer, Integer> aggregatorMap;
public BigTableWriter(CloudBigtableOptions options) {
aggregatorMap = new HashMap<>();
}
@Override
public void processElement(DoFn<String, Void>.ProcessContext c){
try {
....do work here
}
catch(Exception ex){
aggregateException(ex.getCause().getMessage());
}
}
public void aggregateException(String exceptionMessage) {
Aggregator<Integer, Integer> aggregator = null;
if(!aggregatorMap.containsKey(exceptionMessage){
aggregator = createAggregator(exceptionMessage,new Sum.SumIntegerFn());
}
else {
aggregator = aggregatorMap.get(exceptionMessage);
}
aggregator.put(exceptionMessage, aggregator);
}
}
【问题讨论】:
-
没有直接回答您的问题,但我看到您正在写信给 Bigtable。你试过 Bigtable 团队开发的原生连接器cloud.google.com/bigtable/docs/dataflow-hbase 吗?
-
@jfkk,我们正在使用本机连接器。它是从这个类中调用的
标签: google-cloud-platform google-cloud-dataflow