【发布时间】:2022-01-06 02:51:52
【问题描述】:
在单元测试中本地执行正常,但是当 Spark Streaming 执行传播到真正的集群执行器时失败,就像它们静默崩溃并且不再可用于上下文:
stream execution thread for kafkaDataGeneratorInactiveESP_02/Distance [id = 438f45a0-acd6-4729-953f-5a18ae208f1f, runId = a98c6d39-fe14-4ed5-b7fe-7e4009de51b2]] impl.BlockReaderFactory (BlockReaderFactory.java:getRemoteBlockReaderFromTcp(765)) - I/O error constructing remote block reader.
java.nio.channels.ClosedByInterruptException
at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:656)
at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:192)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:533)
at org.apache.hadoop.hdfs.DFSClient.newConnectedPeer(DFSClient.java:2940)
at org.apache.hadoop.hdfs.client.impl.BlockReaderFactory.nextTcpPeer(BlockReaderFactory.java:822)
at org.apache.hadoop.hdfs.client.impl.BlockReaderFactory.getRemoteBlockReaderFromTcp(BlockReaderFactory.java:747)
at org.apache.hadoop.hdfs.client.impl.BlockReaderFactory.build(BlockReaderFactory.java:380)
at org.apache.hadoop.hdfs.DFSInputStream.getBlockReader(DFSInputStream.java:644)
at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:575)
at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:757)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:829)
at java.io.DataInputStream.read(DataInputStream.java:149)
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
at java.io.InputStreamReader.read(InputStreamReader.java:184)
at java.io.BufferedReader.fill(BufferedReader.java:161)
at java.io.BufferedReader.readLine(BufferedReader.java:324)
at java.io.BufferedReader.readLine(BufferedReader.java:389)
at scala.io.BufferedSource$BufferedLineIterator.hasNext(BufferedSource.scala:74)
at org.apache.spark.sql.execution.streaming.CommitLog.deserialize(CommitLog.scala:56)
at org.apache.spark.sql.execution.streaming.CommitLog.deserialize(CommitLog.scala:48)
at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.get(HDFSMetadataLog.scala:153)
at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$getLatest$2(HDFSMetadataLog.scala:190)
at scala.runtime.java8.JFunction1$mcVJ$sp.apply(JFunction1$mcVJ$sp.java:23)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofLong.foreach(ArrayOps.scala:258)
at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.getLatest(HDFSMetadataLog.scala:189)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.populateStartOffsets(MicroBatchExecution.scala:300)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:194)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:191)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:185)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:334)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:245)
我尝试的第一件事是更改查询名称,带有斜杠和空格:kafkaDataGeneratorInactiveESP_02/Distance
在将其替换为正确的之后
.queryName("kafkaDataGeneratorInactive" + currentIter.metadata.getString("label"))
100% 证明字符串中有/或空格,错误并没有消失。
【问题讨论】:
标签: java apache-spark spark-streaming spark-structured-streaming