【问题标题】:Spark Error: I/O error constructing remote block reader. java.nio.channels.ClosedByInterruptException at java.nio.channels.ClosedByInterruptExceptionSpark 错误:构造远程块读取器的 I/O 错误。 java.nio.channels.ClosedByInterruptException 在 java.nio.channels.ClosedByInterruptException
【发布时间】:2022-01-06 02:51:52
【问题描述】:

在单元测试中本地执行正常,但是当 Spark Streaming 执行传播到真正的集群执行器时失败,就像它们静默崩溃并且不再可用于上下文:

stream execution thread for kafkaDataGeneratorInactiveESP_02/Distance [id = 438f45a0-acd6-4729-953f-5a18ae208f1f, runId = a98c6d39-fe14-4ed5-b7fe-7e4009de51b2]] impl.BlockReaderFactory (BlockReaderFactory.java:getRemoteBlockReaderFromTcp(765)) - I/O error constructing remote block reader.
java.nio.channels.ClosedByInterruptException
        at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:656)
        at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:192)
        at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:533)
        at org.apache.hadoop.hdfs.DFSClient.newConnectedPeer(DFSClient.java:2940)
        at org.apache.hadoop.hdfs.client.impl.BlockReaderFactory.nextTcpPeer(BlockReaderFactory.java:822)
        at org.apache.hadoop.hdfs.client.impl.BlockReaderFactory.getRemoteBlockReaderFromTcp(BlockReaderFactory.java:747)
        at org.apache.hadoop.hdfs.client.impl.BlockReaderFactory.build(BlockReaderFactory.java:380)
        at org.apache.hadoop.hdfs.DFSInputStream.getBlockReader(DFSInputStream.java:644)
        at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:575)
        at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:757)
        at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:829)
        at java.io.DataInputStream.read(DataInputStream.java:149)
        at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
        at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
        at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
        at java.io.InputStreamReader.read(InputStreamReader.java:184)
        at java.io.BufferedReader.fill(BufferedReader.java:161)
        at java.io.BufferedReader.readLine(BufferedReader.java:324)
        at java.io.BufferedReader.readLine(BufferedReader.java:389)
        at scala.io.BufferedSource$BufferedLineIterator.hasNext(BufferedSource.scala:74)
        at org.apache.spark.sql.execution.streaming.CommitLog.deserialize(CommitLog.scala:56)
        at org.apache.spark.sql.execution.streaming.CommitLog.deserialize(CommitLog.scala:48)
        at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.get(HDFSMetadataLog.scala:153)
        at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$getLatest$2(HDFSMetadataLog.scala:190)
        at scala.runtime.java8.JFunction1$mcVJ$sp.apply(JFunction1$mcVJ$sp.java:23)
        at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
        at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofLong.foreach(ArrayOps.scala:258)
        at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.getLatest(HDFSMetadataLog.scala:189)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.populateStartOffsets(MicroBatchExecution.scala:300)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:194)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:191)
        at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:185)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:334)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:245)

我尝试的第一件事是更改查询名称,带有斜杠和空格:kafkaDataGeneratorInactiveESP_02/Distance

在将其替换为正确的之后

.queryName("kafkaDataGeneratorInactive" + currentIter.metadata.getString("label"))

100% 证明字符串中有/或空格,错误并没有消失。

【问题讨论】:

    标签: java apache-spark spark-streaming spark-structured-streaming


    【解决方案1】:

    失败的原因实际上是查询名称和检查点位置路径使用了相同的名称(但不是在第一次尝试改进的部分)。后来我又找到了一个错误日志:

    2021-12-01 15:05:46,906 WARN  [main] streaming.StreamingQueryManager (Logging.scala:logWarning(69)) - Stopping existing streaming query [id=b13a69d7-5a2f-461e-91a7-a9138c4aa716, runId=9cb31852-d276-42d8-ade6-9839fa97f85c], as a new run is being started.
    

    为什么查询被停止了?这是因为在 Scala 中,我在循环中创建流式查询,迭代集合,同时保持所有查询名称和所有检查点名称相同。在使它们唯一之后(我只是使用集合中的字符串值),失败问题就消失了。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-09-15
      • 1970-01-01
      • 1970-01-01
      • 2016-01-26
      • 2016-12-24
      相关资源
      最近更新 更多