【发布时间】:2019-11-27 03:52:42
【问题描述】:
我们有一个 flink 流作业,它从 kafka 读取数据并将其接收到 S3。我们使用 flink 的内部流文件接收器 API 来实现这一点。但是,几天后,作业失败并且无法从失败中恢复。该消息说它无法从 s3 中找到 tmp 文件。我们想知道可能的根本原因是什么,因为我们真的不想丢失任何数据。
谢谢。
整个输出是这样的
java.io.FileNotFoundException: No such file or directory: s3://bucket_name/_part-0-282_tmp_b9777494-d73b-4141-a4cf-b8912019160e
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2255)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:699)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.open(FileSystem.java:950)
at org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.getObject(HadoopS3AccessHelper.java:99)
at org.apache.flink.fs.s3.common.writer.S3RecoverableMultipartUploadFactory.recoverInProgressPart(S3RecoverableMultipartUploadFactory.java:97)
at org.apache.flink.fs.s3.common.writer.S3RecoverableMultipartUploadFactory.recoverRecoverableUpload(S3RecoverableMultipartUploadFactory.java:75)
at org.apache.flink.fs.s3.common.writer.S3RecoverableWriter.recover(S3RecoverableWriter.java:95)
at org.apache.flink.fs.s3.common.writer.S3RecoverableWriter.recover(S3RecoverableWriter.java:50)
at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restoreInProgressFile(Bucket.java:140)
at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.<init>(Bucket.java:127)
at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:396)
at org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64)
at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177)
at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165)
at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149)
at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
【问题讨论】:
-
您好@胡广,能否请您粘贴整个错误消息和您用于定义接收器的代码 sn-p?
-
@TobiSH 我已经粘贴了上面的整个消息。谢谢
-
您好,我在 Kinesis Analytics 和 S3 上使用 Flink 时遇到了同样的问题。有任何更新或解决方案吗?就我而言,快照/重新部署后我无法恢复