【问题标题】:Unable to encode element with HBaseMutationCoder无法使用 HBaseMutationCoder 对元素进行编码
【发布时间】:2017-09-01 22:05:20
【问题描述】:

我正在尝试使用 cloud-bigtable-client (https://github.com/GoogleCloudPlatform/cloud-bigtable-client) 通过 Dataflow 将突变(增量)应用到 Bigtable。

以下是我的工作内容的高级摘要:

  PCollection<SomeData> somedata = ...;
  somedata.apply(ParDo.of(new CreateMutations()))
      .setCoder(new HBaseMutationCoder()).apply(CloudBigtableIO.writeToTable(config));
  // I don't think it is necessary to explicitly set Coder here; I tried both ways.

CreateMutations 是一个 DoFn,看起来像:

// c.element() is KV<String, Iterable<SomeData>>
public void processElement(ProcessContext c) {
  Increment mutation = new Increment(c.element().getKey().getBytes());
  for (SomeData data : c.element().getValue()) {
    // Obtain cf (String), qual (String), value (long) from data.
    // None of them is null. 
    mutation.addColumn(cf.getBytes(), qual.getBytes(), value);
  }
  c.output(mutation);
}

令人惊讶的是,执行此 DoFn 时作业失败,因为 HBaseMutationCoder 无法对元素进行编码。这是堆栈跟踪的一小部分:

(e8a8d266ed05e19f): java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: java.lang.RuntimeException: java.lang.IllegalArgumentException: Unable to encode element 'row=some_string, families={(family=a, columns={some_string/a:2:text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=1, some_string/a:8:text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=9620}), (family=m, columns={some_string/m:2:text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=1, some_string/m:8:text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=9620}}' with coder 'HBaseMutationCoder'.
    at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:160)
    at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.outputWindowedValue(DoFnRunnerBase.java:288)
    at  ......

请注意,在错误消息中,它清楚地显示rowfamilycolumn 限定符和values 已正确填写。此特定错误消息显示它包含四个要递增的单元格。 我在使用 Delete's 和 Put's 时没有遇到任何问题,但这是我第一次使用 Increment's -- 除了行、族、限定符和值之外,我还需要填充什么吗?

任何帮助将不胜感激。

我也尝试使用Put 代替Increment 并且它起作用了(它与上面的代码相同,除了两行标有(*))。

// c.element() is KV<String, Iterable<SomeData>>
public void processElement(ProcessContext c) {
  Put mutation = new Put(c.element().getKey().getBytes()); //(*)
  for (SomeData data : c.element().getValue()) {
    // Obtain cf (String), qual (String), value (long) from data.
    // None of them is null. 
    mutation.addImmutable(cf.getBytes(), qual.getBytes(), Bytes.toBytes(value)); //(*)
  }
  c.output(mutation);
}

(我在这里发现了一个相关问题:How to load data into Google Cloud Bigtable from Google BigQuery 但我遇到的问题似乎不是由null 值引起的,因为所有行/列族/限定符/值都已正确填充。)


更新:这是我得到的完整堆栈跟踪。

    (875583981e325b46): java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: java.lang.RuntimeException: java.lang.IllegalArgumentException: Unable to encode element 'row=some_string, families={(family=a, columns={some_string/a:8:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=940, some_string/a:2:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=2}), (family=m, columns={some_string/m:2:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=2, some_string/m:8:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=940}}' with coder 'HBaseMutationCoder'.
    at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:160)
    at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.outputWindowedValue(DoFnRunnerBase.java:288)
    at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.outputWindowedValue(DoFnRunnerBase.java:284)
    at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnProcessContext$1.outputWindowedValue(DoFnRunnerBase.java:508)
    at com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsViaIteratorsDoFn.processElement(GroupAlsoByWindowsViaIteratorsDoFn.java:123)
    at com.google.cloud.dataflow.sdk.util.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:49)
    at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.processElement(DoFnRunnerBase.java:139)
    at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:188)
    at com.google.cloud.dataflow.sdk.runners.worker.ForwardingParDoFn.processElement(ForwardingParDoFn.java:42)
    at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerLoggingParDoFn.processElement(DataflowWorkerLoggingParDoFn.java:47)
    at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:55)
    at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)
    at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:221)
    at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.start(ReadOperation.java:182)
    at com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:69)
    at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:284)
    at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:220)
    at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:170)
    at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.doWork(DataflowWorkerHarness.java:192)
    at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:172)
    at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:159)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: com.google.cloud.dataflow.sdk.util.UserCodeException: java.lang.RuntimeException: java.lang.IllegalArgumentException: Unable to encode element 'row=some_string, families={(family=a, columns={some_string/a:8:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=940, some_string/a:2:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=2}), (family=m, columns={some_string/m:2:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=2, some_string/m:8:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=940}}' with coder 'HBaseMutationCoder'.
    at com.google.cloud.dataflow.sdk.util.UserCodeException.wrap(UserCodeException.java:35)
    at com.google.cloud.dataflow.sdk.util.UserCodeException.wrapIf(UserCodeException.java:40)
    at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.wrapUserCodeException(DoFnRunnerBase.java:369)
    at com.google.cloud.dataflow.sdk.util.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:51)
    at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.processElement(DoFnRunnerBase.java:139)
    at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:188)
    at com.google.cloud.dataflow.sdk.runners.worker.ForwardingParDoFn.processElement(ForwardingParDoFn.java:42)
    at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerLoggingParDoFn.processElement(DataflowWorkerLoggingParDoFn.java:47)
    at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:55)
    at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)
    at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:158)
    ... 24 more
Caused by: java.lang.RuntimeException: java.lang.IllegalArgumentException: Unable to encode element 'row=some_string, families={(family=a, columns={some_string/a:8:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=940, some_string/a:2:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=2}), (family=m, columns={some_string/m:2:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=2, some_string/m:8:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=940}}' with coder 'HBaseMutationCoder'.
    at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:160)
    at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.outputWindowedValue(DoFnRunnerBase.java:288)
    at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnProcessContext.output(DoFnRunnerBase.java:450)
    at com.moloco.dataflow.bigtable.AptRecovery$UpdateCountPerCell.processElement(AptRecovery.java:78)
Caused by: java.lang.IllegalArgumentException: Unable to encode element 'row=some_string, families={(family=a, columns={some_string/a:8:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=940, some_string/a:2:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=2}), (family=m, columns={some_string/m:2:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=2, some_string/m:8:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=940}}' with coder 'HBaseMutationCoder'.
    at com.google.cloud.dataflow.sdk.coders.StandardCoder.getEncodedElementByteSize(StandardCoder.java:170)
    at com.google.cloud.dataflow.sdk.coders.StandardCoder.registerByteSizeObserver(StandardCoder.java:185)
    at com.google.cloud.dataflow.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:641)
    at com.google.cloud.dataflow.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:552)
    at com.google.cloud.dataflow.sdk.runners.worker.MapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(MapTaskExecutorFactory.java:351)
    at com.google.cloud.dataflow.sdk.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:125)
    at com.google.cloud.dataflow.sdk.runners.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:61)
    at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:46)
    at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:158)
    at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.outputWindowedValue(DoFnRunnerBase.java:288)
    at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnProcessContext.output(DoFnRunnerBase.java:450)
    at com.moloco.dataflow.bigtable.AptRecovery$UpdateCountPerCell.processElement(AptRecovery.java:78)
    at com.google.cloud.dataflow.sdk.util.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:49)
    at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.processElement(DoFnRunnerBase.java:139)
    at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:188)
    at com.google.cloud.dataflow.sdk.runners.worker.ForwardingParDoFn.processElement(ForwardingParDoFn.java:42)
    at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerLoggingParDoFn.processElement(DataflowWorkerLoggingParDoFn.java:47)
    at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:55)
    at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)
    at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:158)
    at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.outputWindowedValue(DoFnRunnerBase.java:288)
    at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.outputWindowedValue(DoFnRunnerBase.java:284)
    at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnProcessContext$1.outputWindowedValue(DoFnRunnerBase.java:508)
    at com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsViaIteratorsDoFn.processElement(GroupAlsoByWindowsViaIteratorsDoFn.java:123)
    at com.google.cloud.dataflow.sdk.util.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:49)
    at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.processElement(DoFnRunnerBase.java:139)
    at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:188)
    at com.google.cloud.dataflow.sdk.runners.worker.ForwardingParDoFn.processElement(ForwardingParDoFn.java:42)
    at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerLoggingParDoFn.processElement(DataflowWorkerLoggingParDoFn.java:47)
    at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:55)
    at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)
    at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:221)
    at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.start(ReadOperation.java:182)
    at com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:69)
    at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeW

【问题讨论】:

  • 您是否在堆栈跟踪中看到“Caused by:..”?你能分享一下吗?这可能会缩小这里发生的事情。
  • 让我用完整的堆栈跟踪更新问题——我不确定

标签: google-cloud-dataflow google-cloud-bigtable


【解决方案1】:

好的,我看到 HBaseMutationCoder.java 抛出了 IllegalArgumentException。查看 https://github.com/apache/beam/blob/master/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java#L68 , INCREMENT 不起作用,因为它不是幂等的(但 PUT 是),这解释了您所看到的。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2017-04-01
    • 2015-11-01
    • 1970-01-01
    • 1970-01-01
    • 2022-01-20
    • 1970-01-01
    • 2011-04-29
    相关资源
    最近更新 更多