【问题标题】:Get unexpected exception when using dataflow使用数据流时出现意外异常
【发布时间】:2020-02-20 18:56:03
【问题描述】:

当我通过以下步骤使用数据流时: - 从大查询中读取 - 将表格行转换为 json 字符串 - 插入弹性搜索(7.5.2) 它看起来很适合约 100k 记录,但在实际(8m 记录 ~ 65gb)数据流中,插入 300k 后会抛出异常。

Error message from worker: java.lang.RuntimeException: unexpected org.apache.beam.runners.dataflow.worker.util.common.worker.CachingShuffleBatchReader.read(CachingShuffleBatchReader.java:104) org.apache.beam.runners.dataflow.worker.util.common.worker.BatchingShuffleEntryReader$ShuffleReadIterator.fillEntries(BatchingShuffleEntryReader.java:125) org.apache.beam.runners.dataflow.worker.util.common.worker.BatchingShuffleEntryReader$ShuffleReadIterator.fillEntriesIfNeeded(BatchingShuffleEntryReader.java:119) org.apache.beam.runners.dataflow.worker.util.common.worker.BatchingShuffleEntryReader$ShuffleReadIterator.hasNext(BatchingShuffleEntryReader.java:84) org.apache.beam.runners.dataflow.worker.util.common.ForwardingReiterator.hasNext(ForwardingReiterator.java:63) org.apache.beam.runners.dataflow.worker.util.common.worker.GroupingShuffleEntryIterator.advance(GroupingShuffleEntryIterator.java:109) org.apache.beam.runners.dataflow.worker.GroupingShuffleReader$GroupingShuffleReaderIterator.advance(GroupingShuffleReader.java:272) org.apache.beam.runners.dataflow.worker.GroupingShuffleReader$GroupingShuffleReaderIterator.start(GroupingShuffleReader.java:266) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:361) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:194) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159) org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77) org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:411) org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:380) org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:305) org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:140) org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:120) org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107) java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.util.concurrent.ExecutionException: java.io.IOException: DEADLINE_EXCEEDED: (g)RPC timed out when customerdataworker-khanhn-02170116-o318-harness-8pqr talking to customerdataworker-khanhn-02170116-o318-harness-f8zt:12346. Server unresponsive (ping error: Deadline Exceeded, {"created":"@1581934551.886578453","description":"Deadline Exceeded","file":"third_party/grpc/src/core/ext/filters/deadline/deadline_filter.cc","file_line":69,"grpc_status":4}). Typically one can self manage this issue, please read: https://cloud.google.com/dataflow/docs/guides/common-errors#tsg-rpc-timeout org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:531) org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:492) org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.AbstractFuture$TrustedFuture.get(AbstractFuture.java:83) org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:196) org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2312) org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278) org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154) org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044) org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952) org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974) org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958) org.apache.beam.runners.dataflow.worker.util.common.worker.CachingShuffleBatchReader.read(CachingShuffleBatchReader.java:101) ... 21 more Caused by: java.io.IOException: DEADLINE_EXCEEDED: (g)RPC timed out when customerdataworker-khanhn-02170116-o318-harness-8pqr talking to customerdataworker-khanhn-02170116-o318-harness-f8zt:12346. Server unresponsive (ping error: Deadline Exceeded, {"created":"@1581934551.886578453","description":"Deadline Exceeded","file":"third_party/grpc/src/core/ext/filters/deadline/deadline_filter.cc","file_line":69,"grpc_status":4}). Typically one can self manage this issue, please read: https://cloud.google.com/dataflow/docs/guides/common-errors#tsg-rpc-timeout org.apache.beam.runners.dataflow.worker.ApplianceShuffleReader.readIncludingPosition(Native Method) org.apache.beam.runners.dataflow.worker.ChunkingShuffleBatchReader.read(ChunkingShuffleBatchReader.java:58) org.apache.beam.runners.dataflow.worker.util.common.worker.CachingShuffleBatchReader$1.load(CachingShuffleBatchReader.java:70) org.apache.beam.runners.dataflow.worker.util.common.worker.CachingShuffleBatchReader$1.load(CachingShuffleBatchReader.java:66) org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528) org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277) ... 27 more

我还将这些配置用作(增加磁盘大小,工人数量)的建议:https://cloud.google.com/dataflow/docs/guides/common-errors#tsg-rpc-timeout,但它似乎仍然无法正常工作。 我当前的配置:

--runner=DataflowRunner \
      --numWorkers=10 \
      --maxNumWorkers=20 \
      --diskSizeGb=150 \
      --workerMachineType=n1-standard-1 \
      --region=asia-east1" && \

---更新1: 登录stackdriverenter image description here

【问题讨论】:

  • 您能否检查您的堆栈驱动程序以查看它可能抛出的错误或异常?
  • 感谢您的回复,但我想这就是我现在所拥有的一切
  • 您能否将此标志添加到您的命令行:--dry_runLink 用于 dry_run 文档。让我知道它是否有效。

标签: elasticsearch google-cloud-dataflow apache-beam


【解决方案1】:

感谢回复,问题已解决。根本原因是dataflow创建的VM实例有防火墙规则,我们应该允许它们之间的连接。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-12-06
    • 1970-01-01
    • 1970-01-01
    • 2021-09-14
    • 2011-05-04
    • 1970-01-01
    • 1970-01-01
    • 2014-12-23
    相关资源
    最近更新 更多