【问题标题】:Simple Hive write not working简单的 Hive 写入不起作用
【发布时间】:2018-07-20 13:45:18
【问题描述】:

尝试使用 Apache Beam 和 Hive 编写一个简单的 POC:

public static void main(String[] args) {
    PipelineOptions options = PipelineOptionsFactory
            .fromArgs(args)
            .withValidation()
            .as(PVAOptions.class);

    Pipeline p = Pipeline.create(options);
    p
        .apply(TextIO.read().from("src/test/resources/words.txt"))
        .apply(ParDo.of(new PukeHive()))
        .apply(HCatalogIO.write()
                .withBatchSize(100)
                .withConfigProperties(getHiveConfigProperties())
                .withTable(getHiveTable())
        )
            ;
    p.run().waitUntilFinish();
}

static class PukeHive extends DoFn<String, HCatRecord> {
    @ProcessElement
    public void processElement(ProcessContext c) throws IOException {
        DefaultHCatRecord rec = new DefaultHCatRecord(1);
        rec.set(0, c.element());
        c.output(rec);
    }
}

这会导致以下异常。调试表明这是因为 Beam 的 WritableCoder 试图创建抽象类 HCatRecord 的 newInstance()。

org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.RuntimeException: org.apache.beam.sdk.coders.CoderException: unable to deserialize record
    at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish (DirectRunner.java:349)
    at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish (DirectRunner.java:319)
    at org.apache.beam.runners.direct.DirectRunner.run (DirectRunner.java:210)
    at org.apache.beam.runners.direct.DirectRunner.run (DirectRunner.java:66)
    at org.apache.beam.sdk.Pipeline.run (Pipeline.java:311)
    at org.apache.beam.sdk.Pipeline.run (Pipeline.java:297)
    at com.comp.beam.Main.main (Main.java:48)
    at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke (Method.java:498)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:282)
    at java.lang.Thread.run (Thread.java:748)
Caused by: java.lang.RuntimeException: org.apache.beam.sdk.coders.CoderException: unable to deserialize record
    at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add (ImmutabilityCheckingBundleFactory.java:114)
    at org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output (ParDoEvaluator.java:242)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue (SimpleDoFnRunner.java:219)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.access$700 (SimpleDoFnRunner.java:69)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output (SimpleDoFnRunner.java:517)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output (SimpleDoFnRunner.java:505)
    at com.comp.beam.Main$PukeHive.processElement (Main.java:61)
Caused by: org.apache.beam.sdk.coders.CoderException: unable to deserialize record
    at org.apache.beam.sdk.io.hadoop.WritableCoder.decode (WritableCoder.java:92)
    at org.apache.beam.sdk.io.hadoop.WritableCoder.decode (WritableCoder.java:54)
    at org.apache.beam.sdk.coders.Coder.decode (Coder.java:170)
    at org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream (CoderUtils.java:122)
    at org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray (CoderUtils.java:105)
    at org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray (CoderUtils.java:99)
    at org.apache.beam.sdk.util.CoderUtils.clone (CoderUtils.java:148)
    at org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init> (MutationDetectors.java:117)
    at org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder (MutationDetectors.java:46)
    at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add (ImmutabilityCheckingBundleFactory.java:112)
    at org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output (ParDoEvaluator.java:242)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue (SimpleDoFnRunner.java:219)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.access$700 (SimpleDoFnRunner.java:69)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output (SimpleDoFnRunner.java:517)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output (SimpleDoFnRunner.java:505)
    at com.comp.beam.Main$PukeHive.processElement (Main.java:61)
    at com.comp.beam.Main$PukeHive$DoFnInvoker.invokeProcessElement (Unknown Source)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement (SimpleDoFnRunner.java:185)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.processElement (SimpleDoFnRunner.java:149)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows (SimplePushbackSideInputDoFnRunner.java:78)
    at org.apache.beam.runners.direct.ParDoEvaluator.processElement (ParDoEvaluator.java:189)
    at org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement (DoFnLifecycleManagerRemovingTransformEvaluator.java:55)
    at org.apache.beam.runners.direct.DirectTransformExecutor.processElements (DirectTransformExecutor.java:161)
    at org.apache.beam.runners.direct.DirectTransformExecutor.run (DirectTransformExecutor.java:125)
    at java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511)
    at java.util.concurrent.FutureTask.run (FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:624)
    at java.lang.Thread.run (Thread.java:748)
Caused by: java.lang.InstantiationException
    at sun.reflect.InstantiationExceptionConstructorAccessorImpl.newInstance (InstantiationExceptionConstructorAccessorImpl.java:48)
    at java.lang.reflect.Constructor.newInstance (Constructor.java:423)
    at org.apache.beam.sdk.io.hadoop.WritableCoder.decode (WritableCoder.java:85)
    at org.apache.beam.sdk.io.hadoop.WritableCoder.decode (WritableCoder.java:54)
    at org.apache.beam.sdk.coders.Coder.decode (Coder.java:170)
    at org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream (CoderUtils.java:122)
    at org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray (CoderUtils.java:105)
    at org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray (CoderUtils.java:99)
    at org.apache.beam.sdk.util.CoderUtils.clone (CoderUtils.java:148)
    at org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init> (MutationDetectors.java:117)
    at org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder (MutationDetectors.java:46)
    at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add (ImmutabilityCheckingBundleFactory.java:112)
    at org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output (ParDoEvaluator.java:242)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue (SimpleDoFnRunner.java:219)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.access$700 (SimpleDoFnRunner.java:69)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output (SimpleDoFnRunner.java:517)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output (SimpleDoFnRunner.java:505)
    at com.comp.beam.Main$PukeHive.processElement (Main.java:61)
    at com.comp.beam.Main$PukeHive$DoFnInvoker.invokeProcessElement (Unknown Source)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement (SimpleDoFnRunner.java:185)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.processElement (SimpleDoFnRunner.java:149)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows (SimplePushbackSideInputDoFnRunner.java:78)
    at org.apache.beam.runners.direct.ParDoEvaluator.processElement (ParDoEvaluator.java:189)
    at org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement (DoFnLifecycleManagerRemovingTransformEvaluator.java:55)
    at org.apache.beam.runners.direct.DirectTransformExecutor.processElements (DirectTransformExecutor.java:161)
    at org.apache.beam.runners.direct.DirectTransformExecutor.run (DirectTransformExecutor.java:125)
    at java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511)
    at java.util.concurrent.FutureTask.run (FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:624)
    at java.lang.Thread.run (Thread.java:748)

如何使用 Beam 将我的数据输入 Hive?

【问题讨论】:

    标签: hive apache-beam


    【解决方案1】:

    我相信您需要为HCatRecord 注册编码器,这将是:

    Pipeline p = Pipeline.create(options);
    p.getCoderRegistry()
        .registerCoderForClass(HCatRecord.class, WritableCoder.of(DefaultHCatRecord.class));
    

    为了测试这一点,我在梁项目的HCatalogIOTest 类中添加了以下内容。它使用不同的架构,但应该演示一个完整的示例:

    @Test
    @NeedsEmptyTestTables
    public void testSOKricket() {
      // Register the coder
      defaultPipeline
          .getCoderRegistry()
          .registerCoderForClass(HCatRecord.class, WritableCoder.of(DefaultHCatRecord.class));
      defaultPipeline
          .apply(TextIO.read().from("/tmp/words.txt"))
          .apply(ParDo.of(new PukeHive()))
          .apply(
              HCatalogIO.write()
                  .withConfigProperties(getConfigPropertiesAsMap(service.getHiveConf()))
                  .withDatabase(TEST_DATABASE)
                  .withTable(TEST_TABLE)
                  .withPartition(new java.util.HashMap<>())
                  .withBatchSize(1L));
      defaultPipeline.run();
    }
    
    
    static class PukeHive extends DoFn<String, HCatRecord> {
      @ProcessElement
      public void processElement(ProcessContext c) throws Exception {
        // our test schema is (mycol1 string, mycol2 int)
        DefaultHCatRecord rec = new DefaultHCatRecord(2);
        rec.set(0, c.element());
        rec.set(1, 1);
        c.output(rec);
      }
    }
    

    【讨论】:

    • 好的,谢谢!但是为什么 HCatalogIO 不包含这个?
    • 恐怕我没有深究。其他测试有Create.of(buildHCatRecords(TEST_RECORDS_COUNT)))没有注册。
    猜你喜欢
    • 2016-02-10
    • 1970-01-01
    • 1970-01-01
    • 2014-08-02
    • 2016-11-17
    • 1970-01-01
    • 1970-01-01
    • 2018-02-09
    • 2017-01-13
    相关资源
    最近更新 更多