【问题标题】:Apache Flink fromCollection java.lang.IllegalStateException: unread block dataApache Flink fromCollection java.lang.IllegalStateException:未读块数据
【发布时间】:2016-01-30 19:27:11
【问题描述】:

我正在使用 Scala 和 Flink 1.0-SNAPSHOT 在 DataSet 上执行 leftOuterJoin,但出现以下异常:

    11:54:15,921 INFO  org.apache.flink.runtime.taskmanager.Task - CHAIN DataSource (at com.mycompany.FlinkTest$.main(FlinkTest.scala:99) (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at select('date as 'date,'dsCode as 'dsCode,'datatype as 'datatype,'quote as 'quote,'name as 'name)) (1/1) switched to FAILED with exception.
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply$mcV$sp(JobManager.scala:563)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception: Call to registerInputOutput() of invokable failed
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:529)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalStateException: unread block data
    at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2431)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1383)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:294)
    at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:255)
    at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:282)
    at org.apache.flink.runtime.operators.DataSourceTask.initInputFormat(DataSourceTask.java:241)
    at org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:81)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:526)

我使用一个简单的 Scala 案例类作为 DataSet 的类型:

case class RawValue(date: String, dsCode: String, datatype: String, quote: Double, name: String)

我使用以下方法生成案例类实例:

def getRawValuesFromZipFile(fileName: String) : Array[RawValue]

我初始化环境并通过以下方式创建 DataSet[RawValue]:

val env = ExecutionEnvironment.createLocalEnvironment(4)
val rawValues = env.fromCollection(getRawValuesFromZipFile("filename.zip"))
rawValues.print

我怀疑是序列化问题导致了错误,我正在使用 Scala 2.10.5 和 Java 7 系统库来编译项目。我使用的是 Eclipse,项目是由示例项目生成脚本生成的。

任何有关解决问题的帮助或提示将不胜感激:-) 谢谢, 丹尼尔

【问题讨论】:

  • 您从 zip 文件中读取的数据有多大?
  • 大约 22Mb 压缩、90Mb 未压缩和 3065295 条 RawValue 记录。谢谢!
  • 如果我在 4 个元素的硬编码样本 Array[RawValue] 上运行它,则连接运行良好。进一步调查。
  • 另外,如果我拍摄(20000),它也可以。可能我正在使用完整的输入数据集达到堆限制。现在我需要弄清楚如何以一种有意义且可行的方式拆分输入数据 :-) 谢谢您的提示 :-)

标签: java scala serialization apache-flink


【解决方案1】:

env.fromCollection() 调用可能并不适合您的用例。如果数据变得很大,它会中断,因为数据是随作业一起提供的。数据不会在工作节点上并行读取。

你可以看看这个:https://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#read-compressed-files 看看它是否适合你的情况。它只支持 gzip,但也许你可以用那种格式压缩你的数据。

【讨论】:

    猜你喜欢
    • 2014-07-05
    • 2016-04-26
    • 1970-01-01
    • 2018-05-09
    • 2017-07-25
    • 1970-01-01
    • 2018-02-01
    • 1970-01-01
    • 2018-05-13
    相关资源
    最近更新 更多